Skip to main content

allsource_core/infrastructure/persistence/
system_bootstrap.rs

1#[cfg(feature = "server")]
2use crate::infrastructure::repositories::EventSourcedAuthRepository;
3use crate::{
4    application::services::consumer::ConsumerRegistry,
5    domain::{entities::TenantQuotas, repositories::TenantRepository, value_objects::TenantId},
6    error::Result,
7    infrastructure::{
8        persistence::SystemMetadataStore,
9        repositories::{
10            EventSourcedAuditRepository, EventSourcedConfigRepository, EventSourcedTenantRepository,
11        },
12    },
13};
14use std::{path::PathBuf, sync::Arc};
15
16/// Holds all event-sourced repositories for system metadata.
17///
18/// This is the result of a successful system bootstrap. All repositories
19/// share a single `SystemMetadataStore` for storage, ensuring atomic
20/// write-ahead logging and coordinated recovery.
21pub struct SystemRepositories {
22    /// The underlying durable store
23    pub system_store: Arc<SystemMetadataStore>,
24
25    /// Event-sourced tenant repository
26    pub tenant_repository: Arc<EventSourcedTenantRepository>,
27
28    /// Event-sourced audit repository
29    pub audit_repository: Arc<EventSourcedAuditRepository>,
30
31    /// Event-sourced config repository
32    pub config_repository: Arc<EventSourcedConfigRepository>,
33
34    /// Event-sourced auth repository (API keys)
35    #[cfg(feature = "server")]
36    pub auth_repository: Arc<EventSourcedAuthRepository>,
37
38    /// Durable consumer registry for subscription cursor tracking
39    pub consumer_registry: Arc<ConsumerRegistry>,
40}
41
42/// Staged system initialization with static stability.
43///
44/// Follows the AWS data plane / control plane separation pattern:
45///
46/// ```text
47/// Stage 1: Initialize SystemMetadataStore from local storage
48/// Stage 2: Replay _system/* streams → populate in-memory caches
49/// Stage 3: If empty, run first-boot bootstrap (create default tenant)
50/// Stage 4: Return SystemRepositories → start accepting traffic
51/// ```
52///
53/// # Static Stability
54///
55/// After bootstrap, all metadata reads go through in-memory caches (DashMap).
56/// If a system stream write fails, the error is logged but does NOT block
57/// the data plane. User event ingestion and queries continue with cached
58/// metadata until the system store recovers.
59pub struct SystemBootstrap;
60
61impl SystemBootstrap {
62    /// Execute the staged initialization sequence.
63    ///
64    /// # Arguments
65    /// * `system_data_dir` — Path to the system metadata storage directory
66    /// * `bootstrap_tenant` — Optional default tenant name for first-boot
67    ///
68    /// # Returns
69    /// `SystemRepositories` with all event-sourced repos initialized and caches populated.
70    #[cfg_attr(feature = "hotpath", hotpath::measure)]
71    pub async fn initialize(
72        system_data_dir: PathBuf,
73        bootstrap_tenant: Option<String>,
74    ) -> Result<SystemRepositories> {
75        // Stage 1: Initialize SystemMetadataStore
76        tracing::info!(
77            "Stage 1: Initializing system metadata store at {}",
78            system_data_dir.display()
79        );
80        let system_store = Arc::new(SystemMetadataStore::new(&system_data_dir)?);
81        tracing::info!(
82            "System store initialized: {} events recovered",
83            system_store.total_events()
84        );
85
86        // Stage 2: Replay system streams → populate in-memory caches
87        tracing::info!("Stage 2: Replaying system streams into repository caches");
88        let tenant_repository = Arc::new(EventSourcedTenantRepository::new(system_store.clone()));
89        let audit_repository = Arc::new(EventSourcedAuditRepository::new(system_store.clone()));
90        let config_repository = Arc::new(EventSourcedConfigRepository::new(system_store.clone()));
91        #[cfg(feature = "server")]
92        let auth_repository = Arc::new(EventSourcedAuthRepository::new(system_store.clone()));
93        let consumer_registry = Arc::new(ConsumerRegistry::new_durable(system_store.clone()));
94
95        let tenant_count = tenant_repository.count().await.unwrap_or(0);
96        #[cfg(feature = "server")]
97        tracing::info!(
98            "System caches populated: {} tenants, {} config entries, {} API keys ({} active), {} consumers",
99            tenant_count,
100            config_repository.count(),
101            auth_repository.count(),
102            auth_repository.active_count(),
103            consumer_registry.count()
104        );
105        #[cfg(not(feature = "server"))]
106        tracing::info!(
107            "System caches populated: {} tenants, {} config entries, {} consumers",
108            tenant_count,
109            config_repository.count(),
110            consumer_registry.count()
111        );
112
113        // Stage 3: First-boot bootstrap (if needed)
114        if tenant_count == 0 {
115            if let Some(ref tenant_name) = bootstrap_tenant {
116                tracing::info!(
117                    "Stage 3: First boot detected — creating default tenant '{}'",
118                    tenant_name
119                );
120
121                // Use the tenant name as the tenant ID (lowercase, hyphens)
122                let tenant_id_str = tenant_name
123                    .to_lowercase()
124                    .replace(' ', "-")
125                    .chars()
126                    .filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_')
127                    .collect::<String>();
128
129                match TenantId::new(tenant_id_str) {
130                    Ok(tenant_id) => {
131                        match tenant_repository
132                            .create(tenant_id, tenant_name.clone(), TenantQuotas::unlimited())
133                            .await
134                        {
135                            Ok(_) => {
136                                tracing::info!("Default tenant '{}' created", tenant_name);
137                            }
138                            Err(e) => {
139                                tracing::warn!("Failed to create default tenant: {}", e);
140                            }
141                        }
142                    }
143                    Err(e) => {
144                        tracing::warn!("Invalid default tenant ID: {}", e);
145                    }
146                }
147            } else {
148                tracing::info!(
149                    "Stage 3: No bootstrap tenant configured, skipping first-boot setup"
150                );
151            }
152        } else {
153            tracing::info!("Stage 3: Skipped — {} existing tenants found", tenant_count);
154        }
155
156        // Stage 4: Ready
157        tracing::info!("Stage 4: System metadata initialized — ready to accept traffic");
158
159        Ok(SystemRepositories {
160            system_store,
161            tenant_repository,
162            audit_repository,
163            config_repository,
164            #[cfg(feature = "server")]
165            auth_repository,
166            consumer_registry,
167        })
168    }
169
170    /// Try to initialize system repositories, falling back to None if the
171    /// system data directory is not configured.
172    ///
173    /// This implements the backward-compatibility path: if no system_data_dir
174    /// is set, the system continues with in-memory repositories.
175    pub async fn try_initialize(
176        system_data_dir: Option<PathBuf>,
177        bootstrap_tenant: Option<String>,
178    ) -> Option<SystemRepositories> {
179        if let Some(dir) = system_data_dir {
180            match Self::initialize(dir, bootstrap_tenant).await {
181                Ok(repos) => Some(repos),
182                Err(e) => {
183                    tracing::error!(
184                        "Failed to initialize system metadata store: {}. \
185                     Falling back to in-memory repositories.",
186                        e
187                    );
188                    None
189                }
190            }
191        } else {
192            tracing::info!(
193                "No system_data_dir configured. Using in-memory repositories for metadata."
194            );
195            None
196        }
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203    use tempfile::TempDir;
204
205    #[tokio::test]
206    async fn test_staged_initialization_empty() {
207        let temp_dir = TempDir::new().unwrap();
208        let system_dir = temp_dir.path().join("__system");
209
210        let repos = SystemBootstrap::initialize(system_dir, None).await.unwrap();
211
212        assert_eq!(repos.tenant_repository.count().await.unwrap(), 0);
213        assert_eq!(repos.config_repository.count(), 0);
214    }
215
216    #[tokio::test]
217    async fn test_staged_initialization_with_bootstrap_tenant() {
218        let temp_dir = TempDir::new().unwrap();
219        let system_dir = temp_dir.path().join("__system");
220
221        let repos = SystemBootstrap::initialize(system_dir, Some("Default Tenant".to_string()))
222            .await
223            .unwrap();
224
225        assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
226
227        let tenant_id = TenantId::new("default-tenant".to_string()).unwrap();
228        let tenant = repos
229            .tenant_repository
230            .find_by_id(&tenant_id)
231            .await
232            .unwrap();
233        assert!(tenant.is_some());
234        assert_eq!(tenant.unwrap().name(), "Default Tenant");
235    }
236
237    #[tokio::test]
238    async fn test_staged_initialization_recovery() {
239        let temp_dir = TempDir::new().unwrap();
240        let system_dir = temp_dir.path().join("__system");
241
242        // First boot
243        {
244            let repos =
245                SystemBootstrap::initialize(system_dir.clone(), Some("ACME Corp".to_string()))
246                    .await
247                    .unwrap();
248
249            assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
250
251            // Add some config
252            repos
253                .config_repository
254                .set("feature_flag", serde_json::json!(true), None)
255                .unwrap();
256        }
257
258        // Restart — should recover everything
259        {
260            let repos = SystemBootstrap::initialize(system_dir, None).await.unwrap();
261
262            // Tenant should survive
263            assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
264
265            // Config should survive
266            assert_eq!(repos.config_repository.count(), 1);
267            assert_eq!(
268                repos.config_repository.get_value("feature_flag").unwrap(),
269                serde_json::json!(true)
270            );
271        }
272    }
273
274    #[tokio::test]
275    async fn test_try_initialize_none() {
276        let result = SystemBootstrap::try_initialize(None, None).await;
277        assert!(result.is_none());
278    }
279
280    #[tokio::test]
281    async fn test_try_initialize_some() {
282        let temp_dir = TempDir::new().unwrap();
283        let system_dir = temp_dir.path().join("__system");
284
285        let result = SystemBootstrap::try_initialize(Some(system_dir), None).await;
286        assert!(result.is_some());
287    }
288
289    #[tokio::test]
290    async fn test_no_duplicate_bootstrap_on_restart() {
291        let temp_dir = TempDir::new().unwrap();
292        let system_dir = temp_dir.path().join("__system");
293
294        // First boot with bootstrap
295        SystemBootstrap::initialize(system_dir.clone(), Some("ACME".to_string()))
296            .await
297            .unwrap();
298
299        // Second boot with same bootstrap config — should NOT create duplicate
300        let repos = SystemBootstrap::initialize(system_dir, Some("ACME".to_string()))
301            .await
302            .unwrap();
303
304        assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
305    }
306}