Skip to main content

reifydb_store_multi/store/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_core::event::EventBus;
7#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
8use reifydb_core::event::metric::MultiCommittedEvent;
9use reifydb_runtime::{
10	actor::{mailbox::ActorRef, system::ActorSystem},
11	context::clock::Clock,
12	pool::{PoolConfig, Pools},
13	sync::waiter::WaiterHandle,
14};
15use tracing::instrument;
16
17use crate::{
18	BufferConfig, buffer::tier::MultiBufferTier, config::MultiStoreConfig, flush::actor::FlushMessage,
19	persistent::MultiPersistentTier,
20};
21#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
22use crate::{
23	config::PersistentConfig,
24	flush::{actor::FlushActor, listener::FlushEventListener},
25};
26
27pub mod drop;
28pub mod multi;
29pub mod router;
30pub mod version;
31pub mod worker;
32
33use reifydb_core::actors::drop::DropMessage;
34use worker::{DropActor, DropWorkerConfig};
35
36use crate::Result;
37
38#[derive(Clone)]
39pub struct StandardMultiStore(Arc<StandardMultiStoreInner>);
40
41pub struct StandardMultiStoreInner {
42	pub(crate) buffer: Option<MultiBufferTier>,
43	pub(crate) persistent: Option<MultiPersistentTier>,
44
45	pub(crate) drop_actor: Option<ActorRef<DropMessage>>,
46
47	#[allow(dead_code)]
48	pub(crate) flush_actor: Option<ActorRef<FlushMessage>>,
49
50	_actor_system: ActorSystem,
51
52	pub(crate) event_bus: EventBus,
53}
54
55impl StandardMultiStore {
56	#[instrument(name = "store::multi::new", level = "debug", skip(config), fields(
57		has_buffer = config.buffer.is_some(),
58		has_persistent = config.persistent.is_some(),
59	))]
60	pub fn new(config: MultiStoreConfig) -> Result<Self> {
61		let buffer = config.buffer.map(|c| c.storage);
62
63		let actor_system = config.actor_system.clone();
64
65		let drop_actor = buffer.as_ref().map(|storage| {
66			let drop_config = DropWorkerConfig::default();
67			DropActor::spawn(
68				&actor_system,
69				drop_config,
70				storage.clone(),
71				config.event_bus.clone(),
72				config.clock,
73			)
74		});
75
76		#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
77		let (persistent, flush_actor) = {
78			let persistent_config = config.persistent.clone();
79			let persistent = persistent_config.as_ref().map(|c| c.storage.clone());
80			let flush_actor = match (buffer.as_ref(), persistent.as_ref(), persistent_config.as_ref()) {
81				(Some(buf), Some(persistent_storage), Some(persistent_cfg)) => {
82					let actor_ref = FlushActor::spawn(
83						&actor_system,
84						buf.clone(),
85						persistent_storage.clone(),
86						persistent_cfg.flush_interval,
87					);
88					config.event_bus.register::<MultiCommittedEvent, _>(FlushEventListener::new(
89						actor_ref.clone(),
90					));
91					Some(actor_ref)
92				}
93				_ => None,
94			};
95			(persistent, flush_actor)
96		};
97
98		#[cfg(not(all(feature = "sqlite", not(target_arch = "wasm32"))))]
99		let (persistent, flush_actor): (Option<MultiPersistentTier>, Option<ActorRef<FlushMessage>>) = {
100			let _ = config.persistent;
101			(None, None)
102		};
103
104		Ok(Self(Arc::new(StandardMultiStoreInner {
105			buffer,
106			persistent,
107			drop_actor,
108			flush_actor,
109			_actor_system: actor_system,
110			event_bus: config.event_bus,
111		})))
112	}
113
114	pub fn buffer(&self) -> Option<&MultiBufferTier> {
115		self.buffer.as_ref()
116	}
117
118	pub fn persistent(&self) -> Option<&MultiPersistentTier> {
119		self.persistent.as_ref()
120	}
121
122	pub fn flush_pending_blocking(&self) {
123		let Some(actor_ref) = self.flush_actor.as_ref() else {
124			return;
125		};
126
127		self.event_bus.wait_for_completion();
128
129		let waiter = Arc::new(WaiterHandle::new());
130		let waiter_for_msg = Arc::clone(&waiter);
131		if actor_ref
132			.send_blocking(FlushMessage::FlushPending {
133				waiter: waiter_for_msg,
134			})
135			.is_err()
136		{
137			return;
138		}
139
140		waiter.wait_timeout(Duration::from_secs(60));
141	}
142}
143
144impl Deref for StandardMultiStore {
145	type Target = StandardMultiStoreInner;
146
147	fn deref(&self) -> &Self::Target {
148		&self.0
149	}
150}
151
152impl StandardMultiStore {
153	pub fn testing_memory() -> Self {
154		let pools = Pools::new(PoolConfig::sync_only());
155		let actor_system = ActorSystem::new(pools, Clock::Real);
156		Self::testing_memory_with_eventbus(EventBus::new(&actor_system))
157	}
158
159	pub fn testing_memory_with_eventbus(event_bus: EventBus) -> Self {
160		let pools = Pools::new(PoolConfig::sync_only());
161		let actor_system = ActorSystem::new(pools, Clock::Real);
162		Self::new(MultiStoreConfig {
163			buffer: Some(BufferConfig {
164				storage: MultiBufferTier::memory(),
165			}),
166			persistent: None,
167			retention: Default::default(),
168			merge_config: Default::default(),
169			event_bus,
170			actor_system,
171			clock: Clock::Real,
172		})
173		.unwrap()
174	}
175
176	#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
177	pub fn testing_memory_with_persistent_sqlite() -> Self {
178		let pools = Pools::new(PoolConfig::default());
179		let actor_system = ActorSystem::new(pools, Clock::Real);
180		Self::testing_memory_with_persistent_sqlite_with_eventbus(EventBus::new(&actor_system))
181	}
182
183	#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
184	pub fn testing_memory_with_persistent_sqlite_with_eventbus(event_bus: EventBus) -> Self {
185		let pools = Pools::new(PoolConfig::default());
186		let actor_system = ActorSystem::new(pools, Clock::Real);
187		Self::new(MultiStoreConfig {
188			buffer: Some(BufferConfig {
189				storage: MultiBufferTier::memory(),
190			}),
191			persistent: Some(PersistentConfig::sqlite_in_memory()),
192			retention: Default::default(),
193			merge_config: Default::default(),
194			event_bus,
195			actor_system,
196			clock: Clock::Real,
197		})
198		.unwrap()
199	}
200}