Skip to main content

allsource_core/infrastructure/persistence/
system_bootstrap.rs

1use crate::{
2    domain::{entities::TenantQuotas, repositories::TenantRepository, value_objects::TenantId},
3    error::Result,
4    infrastructure::{
5        persistence::SystemMetadataStore,
6        repositories::{
7            EventSourcedAuditRepository, EventSourcedConfigRepository, EventSourcedTenantRepository,
8        },
9    },
10};
11use std::{path::PathBuf, sync::Arc};
12
13/// Holds all event-sourced repositories for system metadata.
14///
15/// This is the result of a successful system bootstrap. All repositories
16/// share a single `SystemMetadataStore` for storage, ensuring atomic
17/// write-ahead logging and coordinated recovery.
18pub struct SystemRepositories {
19    /// The underlying durable store
20    pub system_store: Arc<SystemMetadataStore>,
21
22    /// Event-sourced tenant repository
23    pub tenant_repository: Arc<EventSourcedTenantRepository>,
24
25    /// Event-sourced audit repository
26    pub audit_repository: Arc<EventSourcedAuditRepository>,
27
28    /// Event-sourced config repository
29    pub config_repository: Arc<EventSourcedConfigRepository>,
30}
31
32/// Staged system initialization with static stability.
33///
34/// Follows the AWS data plane / control plane separation pattern:
35///
36/// ```text
37/// Stage 1: Initialize SystemMetadataStore from local storage
38/// Stage 2: Replay _system/* streams → populate in-memory caches
39/// Stage 3: If empty, run first-boot bootstrap (create default tenant)
40/// Stage 4: Return SystemRepositories → start accepting traffic
41/// ```
42///
43/// # Static Stability
44///
45/// After bootstrap, all metadata reads go through in-memory caches (DashMap).
46/// If a system stream write fails, the error is logged but does NOT block
47/// the data plane. User event ingestion and queries continue with cached
48/// metadata until the system store recovers.
49pub struct SystemBootstrap;
50
51impl SystemBootstrap {
52    /// Execute the staged initialization sequence.
53    ///
54    /// # Arguments
55    /// * `system_data_dir` — Path to the system metadata storage directory
56    /// * `bootstrap_tenant` — Optional default tenant name for first-boot
57    ///
58    /// # Returns
59    /// `SystemRepositories` with all event-sourced repos initialized and caches populated.
60    pub async fn initialize(
61        system_data_dir: PathBuf,
62        bootstrap_tenant: Option<String>,
63    ) -> Result<SystemRepositories> {
64        // Stage 1: Initialize SystemMetadataStore
65        tracing::info!(
66            "Stage 1: Initializing system metadata store at {}",
67            system_data_dir.display()
68        );
69        let system_store = Arc::new(SystemMetadataStore::new(&system_data_dir)?);
70        tracing::info!(
71            "System store initialized: {} events recovered",
72            system_store.total_events()
73        );
74
75        // Stage 2: Replay system streams → populate in-memory caches
76        tracing::info!("Stage 2: Replaying system streams into repository caches");
77        let tenant_repository = Arc::new(EventSourcedTenantRepository::new(system_store.clone()));
78        let audit_repository = Arc::new(EventSourcedAuditRepository::new(system_store.clone()));
79        let config_repository = Arc::new(EventSourcedConfigRepository::new(system_store.clone()));
80
81        let tenant_count = tenant_repository.count().await.unwrap_or(0);
82        tracing::info!(
83            "System caches populated: {} tenants, {} config entries",
84            tenant_count,
85            config_repository.count()
86        );
87
88        // Stage 3: First-boot bootstrap (if needed)
89        if tenant_count == 0 {
90            if let Some(ref tenant_name) = bootstrap_tenant {
91                tracing::info!(
92                    "Stage 3: First boot detected — creating default tenant '{}'",
93                    tenant_name
94                );
95
96                // Use the tenant name as the tenant ID (lowercase, hyphens)
97                let tenant_id_str = tenant_name
98                    .to_lowercase()
99                    .replace(' ', "-")
100                    .chars()
101                    .filter(|c| c.is_alphanumeric() || *c == '-' || *c == '_')
102                    .collect::<String>();
103
104                match TenantId::new(tenant_id_str) {
105                    Ok(tenant_id) => {
106                        match tenant_repository
107                            .create(tenant_id, tenant_name.clone(), TenantQuotas::unlimited())
108                            .await
109                        {
110                            Ok(_) => {
111                                tracing::info!("Default tenant '{}' created", tenant_name);
112                            }
113                            Err(e) => {
114                                tracing::warn!("Failed to create default tenant: {}", e);
115                            }
116                        }
117                    }
118                    Err(e) => {
119                        tracing::warn!("Invalid default tenant ID: {}", e);
120                    }
121                }
122            } else {
123                tracing::info!(
124                    "Stage 3: No bootstrap tenant configured, skipping first-boot setup"
125                );
126            }
127        } else {
128            tracing::info!("Stage 3: Skipped — {} existing tenants found", tenant_count);
129        }
130
131        // Stage 4: Ready
132        tracing::info!("Stage 4: System metadata initialized — ready to accept traffic");
133
134        Ok(SystemRepositories {
135            system_store,
136            tenant_repository,
137            audit_repository,
138            config_repository,
139        })
140    }
141
142    /// Try to initialize system repositories, falling back to None if the
143    /// system data directory is not configured.
144    ///
145    /// This implements the backward-compatibility path: if no system_data_dir
146    /// is set, the system continues with in-memory repositories.
147    pub async fn try_initialize(
148        system_data_dir: Option<PathBuf>,
149        bootstrap_tenant: Option<String>,
150    ) -> Option<SystemRepositories> {
151        match system_data_dir {
152            Some(dir) => match Self::initialize(dir, bootstrap_tenant).await {
153                Ok(repos) => Some(repos),
154                Err(e) => {
155                    tracing::error!(
156                        "Failed to initialize system metadata store: {}. \
157                         Falling back to in-memory repositories.",
158                        e
159                    );
160                    None
161                }
162            },
163            None => {
164                tracing::info!(
165                    "No system_data_dir configured. Using in-memory repositories for metadata."
166                );
167                None
168            }
169        }
170    }
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use tempfile::TempDir;
177
178    #[tokio::test]
179    async fn test_staged_initialization_empty() {
180        let temp_dir = TempDir::new().unwrap();
181        let system_dir = temp_dir.path().join("__system");
182
183        let repos = SystemBootstrap::initialize(system_dir, None).await.unwrap();
184
185        assert_eq!(repos.tenant_repository.count().await.unwrap(), 0);
186        assert_eq!(repos.config_repository.count(), 0);
187    }
188
189    #[tokio::test]
190    async fn test_staged_initialization_with_bootstrap_tenant() {
191        let temp_dir = TempDir::new().unwrap();
192        let system_dir = temp_dir.path().join("__system");
193
194        let repos = SystemBootstrap::initialize(system_dir, Some("Default Tenant".to_string()))
195            .await
196            .unwrap();
197
198        assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
199
200        let tenant_id = TenantId::new("default-tenant".to_string()).unwrap();
201        let tenant = repos
202            .tenant_repository
203            .find_by_id(&tenant_id)
204            .await
205            .unwrap();
206        assert!(tenant.is_some());
207        assert_eq!(tenant.unwrap().name(), "Default Tenant");
208    }
209
210    #[tokio::test]
211    async fn test_staged_initialization_recovery() {
212        let temp_dir = TempDir::new().unwrap();
213        let system_dir = temp_dir.path().join("__system");
214
215        // First boot
216        {
217            let repos =
218                SystemBootstrap::initialize(system_dir.clone(), Some("ACME Corp".to_string()))
219                    .await
220                    .unwrap();
221
222            assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
223
224            // Add some config
225            repos
226                .config_repository
227                .set("feature_flag", serde_json::json!(true), None)
228                .unwrap();
229        }
230
231        // Restart — should recover everything
232        {
233            let repos = SystemBootstrap::initialize(system_dir, None).await.unwrap();
234
235            // Tenant should survive
236            assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
237
238            // Config should survive
239            assert_eq!(repos.config_repository.count(), 1);
240            assert_eq!(
241                repos.config_repository.get_value("feature_flag").unwrap(),
242                serde_json::json!(true)
243            );
244        }
245    }
246
247    #[tokio::test]
248    async fn test_try_initialize_none() {
249        let result = SystemBootstrap::try_initialize(None, None).await;
250        assert!(result.is_none());
251    }
252
253    #[tokio::test]
254    async fn test_try_initialize_some() {
255        let temp_dir = TempDir::new().unwrap();
256        let system_dir = temp_dir.path().join("__system");
257
258        let result = SystemBootstrap::try_initialize(Some(system_dir), None).await;
259        assert!(result.is_some());
260    }
261
262    #[tokio::test]
263    async fn test_no_duplicate_bootstrap_on_restart() {
264        let temp_dir = TempDir::new().unwrap();
265        let system_dir = temp_dir.path().join("__system");
266
267        // First boot with bootstrap
268        SystemBootstrap::initialize(system_dir.clone(), Some("ACME".to_string()))
269            .await
270            .unwrap();
271
272        // Second boot with same bootstrap config — should NOT create duplicate
273        let repos = SystemBootstrap::initialize(system_dir, Some("ACME".to_string()))
274            .await
275            .unwrap();
276
277        assert_eq!(repos.tenant_repository.count().await.unwrap(), 1);
278    }
279}