reifydb_store_multi/store/
mod.rs1use std::{ops::Deref, sync::Arc, time::Duration};
5
6use reifydb_core::event::EventBus;
7#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
8use reifydb_core::event::metric::MultiCommittedEvent;
9use reifydb_runtime::{
10 actor::{mailbox::ActorRef, system::ActorSystem},
11 context::clock::Clock,
12 pool::{PoolConfig, Pools},
13 sync::waiter::WaiterHandle,
14};
15use tracing::instrument;
16
17use crate::{
18 BufferConfig, buffer::tier::MultiBufferTier, config::MultiStoreConfig, flush::actor::FlushMessage,
19 persistent::MultiPersistentTier,
20};
21#[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
22use crate::{
23 config::PersistentConfig,
24 flush::{actor::FlushActor, listener::FlushEventListener},
25};
26
27pub mod drop;
28pub mod multi;
29pub mod router;
30pub mod version;
31pub mod worker;
32
33use reifydb_core::actors::drop::DropMessage;
34use worker::{DropActor, DropWorkerConfig};
35
36use crate::Result;
37
38#[derive(Clone)]
39pub struct StandardMultiStore(Arc<StandardMultiStoreInner>);
40
41pub struct StandardMultiStoreInner {
42 pub(crate) buffer: Option<MultiBufferTier>,
43 pub(crate) persistent: Option<MultiPersistentTier>,
44
45 pub(crate) drop_actor: Option<ActorRef<DropMessage>>,
46
47 #[allow(dead_code)]
48 pub(crate) flush_actor: Option<ActorRef<FlushMessage>>,
49
50 _actor_system: ActorSystem,
51
52 pub(crate) event_bus: EventBus,
53}
54
55impl StandardMultiStore {
56 #[instrument(name = "store::multi::new", level = "debug", skip(config), fields(
57 has_buffer = config.buffer.is_some(),
58 has_persistent = config.persistent.is_some(),
59 ))]
60 pub fn new(config: MultiStoreConfig) -> Result<Self> {
61 let buffer = config.buffer.map(|c| c.storage);
62
63 let actor_system = config.actor_system.clone();
64
65 let drop_actor = buffer.as_ref().map(|storage| {
66 let drop_config = DropWorkerConfig::default();
67 DropActor::spawn(
68 &actor_system,
69 drop_config,
70 storage.clone(),
71 config.event_bus.clone(),
72 config.clock,
73 )
74 });
75
76 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
77 let (persistent, flush_actor) = {
78 let persistent_config = config.persistent.clone();
79 let persistent = persistent_config.as_ref().map(|c| c.storage.clone());
80 let flush_actor = match (buffer.as_ref(), persistent.as_ref(), persistent_config.as_ref()) {
81 (Some(buf), Some(persistent_storage), Some(persistent_cfg)) => {
82 let actor_ref = FlushActor::spawn(
83 &actor_system,
84 buf.clone(),
85 persistent_storage.clone(),
86 persistent_cfg.flush_interval,
87 );
88 config.event_bus.register::<MultiCommittedEvent, _>(FlushEventListener::new(
89 actor_ref.clone(),
90 ));
91 Some(actor_ref)
92 }
93 _ => None,
94 };
95 (persistent, flush_actor)
96 };
97
98 #[cfg(not(all(feature = "sqlite", not(target_arch = "wasm32"))))]
99 let (persistent, flush_actor): (Option<MultiPersistentTier>, Option<ActorRef<FlushMessage>>) = {
100 let _ = config.persistent;
101 (None, None)
102 };
103
104 Ok(Self(Arc::new(StandardMultiStoreInner {
105 buffer,
106 persistent,
107 drop_actor,
108 flush_actor,
109 _actor_system: actor_system,
110 event_bus: config.event_bus,
111 })))
112 }
113
114 pub fn buffer(&self) -> Option<&MultiBufferTier> {
115 self.buffer.as_ref()
116 }
117
118 pub fn persistent(&self) -> Option<&MultiPersistentTier> {
119 self.persistent.as_ref()
120 }
121
122 pub fn flush_pending_blocking(&self) {
123 let Some(actor_ref) = self.flush_actor.as_ref() else {
124 return;
125 };
126
127 self.event_bus.wait_for_completion();
128
129 let waiter = Arc::new(WaiterHandle::new());
130 let waiter_for_msg = Arc::clone(&waiter);
131 if actor_ref
132 .send_blocking(FlushMessage::FlushPending {
133 waiter: waiter_for_msg,
134 })
135 .is_err()
136 {
137 return;
138 }
139
140 waiter.wait_timeout(Duration::from_secs(60));
141 }
142}
143
144impl Deref for StandardMultiStore {
145 type Target = StandardMultiStoreInner;
146
147 fn deref(&self) -> &Self::Target {
148 &self.0
149 }
150}
151
152impl StandardMultiStore {
153 pub fn testing_memory() -> Self {
154 let pools = Pools::new(PoolConfig::sync_only());
155 let actor_system = ActorSystem::new(pools, Clock::Real);
156 Self::testing_memory_with_eventbus(EventBus::new(&actor_system))
157 }
158
159 pub fn testing_memory_with_eventbus(event_bus: EventBus) -> Self {
160 let pools = Pools::new(PoolConfig::sync_only());
161 let actor_system = ActorSystem::new(pools, Clock::Real);
162 Self::new(MultiStoreConfig {
163 buffer: Some(BufferConfig {
164 storage: MultiBufferTier::memory(),
165 }),
166 persistent: None,
167 retention: Default::default(),
168 merge_config: Default::default(),
169 event_bus,
170 actor_system,
171 clock: Clock::Real,
172 })
173 .unwrap()
174 }
175
176 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
177 pub fn testing_memory_with_persistent_sqlite() -> Self {
178 let pools = Pools::new(PoolConfig::default());
179 let actor_system = ActorSystem::new(pools, Clock::Real);
180 Self::testing_memory_with_persistent_sqlite_with_eventbus(EventBus::new(&actor_system))
181 }
182
183 #[cfg(all(feature = "sqlite", not(target_arch = "wasm32")))]
184 pub fn testing_memory_with_persistent_sqlite_with_eventbus(event_bus: EventBus) -> Self {
185 let pools = Pools::new(PoolConfig::default());
186 let actor_system = ActorSystem::new(pools, Clock::Real);
187 Self::new(MultiStoreConfig {
188 buffer: Some(BufferConfig {
189 storage: MultiBufferTier::memory(),
190 }),
191 persistent: Some(PersistentConfig::sqlite_in_memory()),
192 retention: Default::default(),
193 merge_config: Default::default(),
194 event_bus,
195 actor_system,
196 clock: Clock::Real,
197 })
198 .unwrap()
199 }
200}