allsource_core/infrastructure/persistence/
system_bootstrap.rs1#[cfg(feature = "server")]
2use crate::infrastructure::repositories::EventSourcedAuthRepository;
3use crate::{
4 application::services::consumer::ConsumerRegistry,
5 domain::{entities::TenantQuotas, repositories::TenantRepository, value_objects::TenantId},
6 error::Result,
7 infrastructure::{
8 persistence::SystemMetadataStore,
9 repositories::{
10 EventSourcedAuditRepository, EventSourcedConfigRepository, EventSourcedTenantRepository,
11 },
12 },
13};
14use std::{path::PathBuf, sync::Arc};
15
16pub struct SystemRepositories {
22 pub system_store: Arc<SystemMetadataStore>,
24
25 pub tenant_repository: Arc<EventSourcedTenantRepository>,
27
28 pub audit_repository: Arc<EventSourcedAuditRepository>,
30
31 pub config_repository: Arc<EventSourcedConfigRepository>,
33
34 #[cfg(feature = "server")]
36 pub auth_repository: Arc<EventSourcedAuthRepository>,
37
38 pub consumer_registry: Arc<ConsumerRegistry>,
40}
41
42pub struct SystemBootstrap;
60
61impl SystemBootstrap {
62 #[cfg_attr(feature = "hotpath", hotpath::measure)]
71 pub async fn initialize(
72 system_data_dir: PathBuf,
73 bootstrap_tenant: Option<String>,
74 ) -> Result<SystemRepositories> {
75 tracing::info!(
77 "Stage 1: Initializing system metadata store at {}",
78 system_data_dir.display()
79 );
80 let system_store = Arc::new(SystemMetadataStore::new(&system_data_dir)?);
81 tracing::info!(
82 "System store initialized: {} events recovered",
83 system_store.total_events()
84 );
85
86 tracing::info!("Stage 2: Replaying system streams into repository caches");
88 let tenant_repository = Arc::new(EventSourcedTenantRepository::new(system_store.clone()));
89 let audit_repository = Arc::new(EventSourcedAuditRepository::new(system_store.clone()));
90 let config_repository = Arc::new(EventSourcedConfigRepository::new(system_store.clone()));
91 #[cfg(feature = "server")]
92 let auth_repository = Arc::new(EventSourcedAuthRepository::new(system_store.clone()));
93 let consumer_registry = Arc::new(ConsumerRegistry::new_durable(system_store.clone()));
94
95 let tenant_count = tenant_repository.count().await.unwrap_or(0);
96 #[cfg(feature = "server")]
97 tracing::info!(
98 "System caches populated: {} tenants, {} config entries, {} API keys ({} active), {} consumers",
99 tenant_count,
100 config_repository.count(),
101 auth_repository.count(),
102 auth_repository.active_count(),
103 consumer_registry.count()
104 );
105 #[cfg(not(feature = "server"))]
106 tracing::info!(
107 "System caches populated: {} tenants, {} config entries, {} consumers",
108 tenant_count,
109 config_repository.count(),
110 consumer_registry.count()
111 );
112
113 if tenant_count == 0 {
115 if let Some(ref tenant_name) = bootstrap_tenant {
116 tracing::info!(
117 "Stage 3: First boot detected — creating default tenant '{}'",
118 tenant_name
119 );
120
121 let tenant_id_str = tenant_name
123 .to_lowercase()
124 .replace(' ', "-")
125 .chars()
126 .filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_')
127 .collect::<String>();
128
129 match TenantId::new(tenant_id_str) {
130 Ok(tenant_id) => {
131 match tenant_repository
132 .create(tenant_id, tenant_name.clone(), TenantQuotas::unlimited())
133 .await
134 {
135 Ok(_) => {
136 tracing::info!("Default tenant '{}' created", tenant_name);
137 }
138 Err(e) => {
139 tracing::warn!("Failed to create default tenant: {}", e);
140 }
141 }
142 }
143 Err(e) => {
144 tracing::warn!("Invalid default tenant ID: {}", e);
145 }
146 }
147 } else {
148 tracing::info!(
149 "Stage 3: No bootstrap tenant configured, skipping first-boot setup"
150 );
151 }
152 } else {
153 tracing::info!("Stage 3: Skipped — {} existing tenants found", tenant_count);
154 }
155
156 tracing::info!("Stage 4: System metadata initialized — ready to accept traffic");
158
159 Ok(SystemRepositories {
160 system_store,
161 tenant_repository,
162 audit_repository,
163 config_repository,
164 #[cfg(feature = "server")]
165 auth_repository,
166 consumer_registry,
167 })
168 }
169
170 pub async fn try_initialize(
176 system_data_dir: Option<PathBuf>,
177 bootstrap_tenant: Option<String>,
178 ) -> Option<SystemRepositories> {
179 if let Some(dir) = system_data_dir {
180 match Self::initialize(dir, bootstrap_tenant).await {
181 Ok(repos) => Some(repos),
182 Err(e) => {
183 tracing::error!(
184 "Failed to initialize system metadata store: {}. \
185 Falling back to in-memory repositories.",
186 e
187 );
188 None
189 }
190 }
191 } else {
192 tracing::info!(
193 "No system_data_dir configured. Using in-memory repositories for metadata."
194 );
195 None
196 }
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203 use tempfile::TempDir;
204
205 #[tokio::test]
206 async fn test_staged_initialization_empty() {
207 let temp_dir = TempDir::new().unwrap();
208 let system_dir = temp_dir.path().join("__system");
209
210 let repos = SystemBootstrap::initialize(system_dir, None).await.unwrap();
211
212 assert_eq!(repos.tenant_repository.count().await.unwrap(), 0);
213 assert_eq!(repos.config_repository.count(), 0);
214 }
215
216 #[tokio::test]
217 async fn test_staged_initialization_with_bootstrap_tenant() {
218 let temp_dir = TempDir::new().unwrap();
219 let system_dir = temp_dir.path().join("__system");
220
221 let repos = SystemBootstrap::initialize(system_dir, Some("Default Tenant".to_string()))
222 .await
223 .unwrap();
224
225 assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
226
227 let tenant_id = TenantId::new("default-tenant".to_string()).unwrap();
228 let tenant = repos
229 .tenant_repository
230 .find_by_id(&tenant_id)
231 .await
232 .unwrap();
233 assert!(tenant.is_some());
234 assert_eq!(tenant.unwrap().name(), "Default Tenant");
235 }
236
237 #[tokio::test]
238 async fn test_staged_initialization_recovery() {
239 let temp_dir = TempDir::new().unwrap();
240 let system_dir = temp_dir.path().join("__system");
241
242 {
244 let repos =
245 SystemBootstrap::initialize(system_dir.clone(), Some("ACME Corp".to_string()))
246 .await
247 .unwrap();
248
249 assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
250
251 repos
253 .config_repository
254 .set("feature_flag", serde_json::json!(true), None)
255 .unwrap();
256 }
257
258 {
260 let repos = SystemBootstrap::initialize(system_dir, None).await.unwrap();
261
262 assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
264
265 assert_eq!(repos.config_repository.count(), 1);
267 assert_eq!(
268 repos.config_repository.get_value("feature_flag").unwrap(),
269 serde_json::json!(true)
270 );
271 }
272 }
273
274 #[tokio::test]
275 async fn test_try_initialize_none() {
276 let result = SystemBootstrap::try_initialize(None, None).await;
277 assert!(result.is_none());
278 }
279
280 #[tokio::test]
281 async fn test_try_initialize_some() {
282 let temp_dir = TempDir::new().unwrap();
283 let system_dir = temp_dir.path().join("__system");
284
285 let result = SystemBootstrap::try_initialize(Some(system_dir), None).await;
286 assert!(result.is_some());
287 }
288
289 #[tokio::test]
290 async fn test_no_duplicate_bootstrap_on_restart() {
291 let temp_dir = TempDir::new().unwrap();
292 let system_dir = temp_dir.path().join("__system");
293
294 SystemBootstrap::initialize(system_dir.clone(), Some("ACME".to_string()))
296 .await
297 .unwrap();
298
299 let repos = SystemBootstrap::initialize(system_dir, Some("ACME".to_string()))
301 .await
302 .unwrap();
303
304 assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
305 }
306}