allsource_core/infrastructure/persistence/
system_bootstrap.rs1use crate::{
2 domain::{entities::TenantQuotas, repositories::TenantRepository, value_objects::TenantId},
3 error::Result,
4 infrastructure::{
5 persistence::SystemMetadataStore,
6 repositories::{
7 EventSourcedAuditRepository, EventSourcedConfigRepository, EventSourcedTenantRepository,
8 },
9 },
10};
11use std::{path::PathBuf, sync::Arc};
12
13pub struct SystemRepositories {
19 pub system_store: Arc<SystemMetadataStore>,
21
22 pub tenant_repository: Arc<EventSourcedTenantRepository>,
24
25 pub audit_repository: Arc<EventSourcedAuditRepository>,
27
28 pub config_repository: Arc<EventSourcedConfigRepository>,
30}
31
32pub struct SystemBootstrap;
50
51impl SystemBootstrap {
52 pub async fn initialize(
61 system_data_dir: PathBuf,
62 bootstrap_tenant: Option<String>,
63 ) -> Result<SystemRepositories> {
64 tracing::info!(
66 "Stage 1: Initializing system metadata store at {}",
67 system_data_dir.display()
68 );
69 let system_store = Arc::new(SystemMetadataStore::new(&system_data_dir)?);
70 tracing::info!(
71 "System store initialized: {} events recovered",
72 system_store.total_events()
73 );
74
75 tracing::info!("Stage 2: Replaying system streams into repository caches");
77 let tenant_repository = Arc::new(EventSourcedTenantRepository::new(system_store.clone()));
78 let audit_repository = Arc::new(EventSourcedAuditRepository::new(system_store.clone()));
79 let config_repository = Arc::new(EventSourcedConfigRepository::new(system_store.clone()));
80
81 let tenant_count = tenant_repository.count().await.unwrap_or(0);
82 tracing::info!(
83 "System caches populated: {} tenants, {} config entries",
84 tenant_count,
85 config_repository.count()
86 );
87
88 if tenant_count == 0 {
90 if let Some(ref tenant_name) = bootstrap_tenant {
91 tracing::info!(
92 "Stage 3: First boot detected — creating default tenant '{}'",
93 tenant_name
94 );
95
96 let tenant_id_str = tenant_name
98 .to_lowercase()
99 .replace(' ', "-")
100 .chars()
101 .filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_')
102 .collect::<String>();
103
104 match TenantId::new(tenant_id_str) {
105 Ok(tenant_id) => {
106 match tenant_repository
107 .create(tenant_id, tenant_name.clone(), TenantQuotas::unlimited())
108 .await
109 {
110 Ok(_) => {
111 tracing::info!("Default tenant '{}' created", tenant_name);
112 }
113 Err(e) => {
114 tracing::warn!("Failed to create default tenant: {}", e);
115 }
116 }
117 }
118 Err(e) => {
119 tracing::warn!("Invalid default tenant ID: {}", e);
120 }
121 }
122 } else {
123 tracing::info!(
124 "Stage 3: No bootstrap tenant configured, skipping first-boot setup"
125 );
126 }
127 } else {
128 tracing::info!("Stage 3: Skipped — {} existing tenants found", tenant_count);
129 }
130
131 tracing::info!("Stage 4: System metadata initialized — ready to accept traffic");
133
134 Ok(SystemRepositories {
135 system_store,
136 tenant_repository,
137 audit_repository,
138 config_repository,
139 })
140 }
141
142 pub async fn try_initialize(
148 system_data_dir: Option<PathBuf>,
149 bootstrap_tenant: Option<String>,
150 ) -> Option<SystemRepositories> {
151 match system_data_dir {
152 Some(dir) => match Self::initialize(dir, bootstrap_tenant).await {
153 Ok(repos) => Some(repos),
154 Err(e) => {
155 tracing::error!(
156 "Failed to initialize system metadata store: {}. \
157 Falling back to in-memory repositories.",
158 e
159 );
160 None
161 }
162 },
163 None => {
164 tracing::info!(
165 "No system_data_dir configured. Using in-memory repositories for metadata."
166 );
167 None
168 }
169 }
170 }
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use tempfile::TempDir;
177
178 #[tokio::test]
179 async fn test_staged_initialization_empty() {
180 let temp_dir = TempDir::new().unwrap();
181 let system_dir = temp_dir.path().join("__system");
182
183 let repos = SystemBootstrap::initialize(system_dir, None).await.unwrap();
184
185 assert_eq!(repos.tenant_repository.count().await.unwrap(), 0);
186 assert_eq!(repos.config_repository.count(), 0);
187 }
188
189 #[tokio::test]
190 async fn test_staged_initialization_with_bootstrap_tenant() {
191 let temp_dir = TempDir::new().unwrap();
192 let system_dir = temp_dir.path().join("__system");
193
194 let repos = SystemBootstrap::initialize(system_dir, Some("Default Tenant".to_string()))
195 .await
196 .unwrap();
197
198 assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
199
200 let tenant_id = TenantId::new("default-tenant".to_string()).unwrap();
201 let tenant = repos
202 .tenant_repository
203 .find_by_id(&tenant_id)
204 .await
205 .unwrap();
206 assert!(tenant.is_some());
207 assert_eq!(tenant.unwrap().name(), "Default Tenant");
208 }
209
210 #[tokio::test]
211 async fn test_staged_initialization_recovery() {
212 let temp_dir = TempDir::new().unwrap();
213 let system_dir = temp_dir.path().join("__system");
214
215 {
217 let repos =
218 SystemBootstrap::initialize(system_dir.clone(), Some("ACME Corp".to_string()))
219 .await
220 .unwrap();
221
222 assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
223
224 repos
226 .config_repository
227 .set("feature_flag", serde_json::json!(true), None)
228 .unwrap();
229 }
230
231 {
233 let repos = SystemBootstrap::initialize(system_dir, None).await.unwrap();
234
235 assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
237
238 assert_eq!(repos.config_repository.count(), 1);
240 assert_eq!(
241 repos.config_repository.get_value("feature_flag").unwrap(),
242 serde_json::json!(true)
243 );
244 }
245 }
246
247 #[tokio::test]
248 async fn test_try_initialize_none() {
249 let result = SystemBootstrap::try_initialize(None, None).await;
250 assert!(result.is_none());
251 }
252
253 #[tokio::test]
254 async fn test_try_initialize_some() {
255 let temp_dir = TempDir::new().unwrap();
256 let system_dir = temp_dir.path().join("__system");
257
258 let result = SystemBootstrap::try_initialize(Some(system_dir), None).await;
259 assert!(result.is_some());
260 }
261
262 #[tokio::test]
263 async fn test_no_duplicate_bootstrap_on_restart() {
264 let temp_dir = TempDir::new().unwrap();
265 let system_dir = temp_dir.path().join("__system");
266
267 SystemBootstrap::initialize(system_dir.clone(), Some("ACME".to_string()))
269 .await
270 .unwrap();
271
272 let repos = SystemBootstrap::initialize(system_dir, Some("ACME".to_string()))
274 .await
275 .unwrap();
276
277 assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
278 }
279}