Skip to main content

ave_core/system/
mod.rs

1pub use error::SystemError;
2use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use crate::{
5    config::{
6        Config, GovernanceSyncConfig, RebootSyncConfig, SinkAuth,
7        TrackerSyncConfig, UpdateSyncConfig,
8    },
9    db::Database,
10    external_db::DBManager,
11    helpers::{db::ExternalDB, sink::AveSink},
12    model::common::contract::WasmRuntime,
13};
14use ave_actors::{
15    ActorSystem, DbManager, EncryptedKey, MachineSpec, SystemRef,
16};
17use ave_common::identity::hash_borsh;
18use serde::{Deserialize, Serialize};
19use tokio::{sync::RwLock, task::JoinHandle};
20use tokio_util::sync::CancellationToken;
21use tracing::error;
22use wasmtime::Module;
23
24pub mod error;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ConfigHelper {
28    pub contracts_path: PathBuf,
29    pub always_accept: bool,
30    pub safe_mode: bool,
31    pub tracking_size: usize,
32    pub only_clear_events: bool,
33    pub ledger_batch_size: usize,
34    pub sync_governance: GovernanceSyncConfig,
35    pub sync_tracker: TrackerSyncConfig,
36    pub sync_update: UpdateSyncConfig,
37    pub sync_reboot: RebootSyncConfig,
38}
39
40impl From<Config> for ConfigHelper {
41    fn from(value: Config) -> Self {
42        Self {
43            contracts_path: value.contracts_path,
44            always_accept: value.always_accept,
45            safe_mode: value.safe_mode,
46            tracking_size: value.tracking_size,
47            only_clear_events: value.only_clear_events,
48            ledger_batch_size: value.sync.ledger_batch_size,
49            sync_governance: value.sync.governance,
50            sync_tracker: value.sync.tracker,
51            sync_update: value.sync.update,
52            sync_reboot: value.sync.reboot,
53        }
54    }
55}
56
57pub async fn system(
58    config: Config,
59    sink_auth: SinkAuth,
60    password: &str,
61    graceful_token: CancellationToken,
62    crash_token: CancellationToken,
63) -> Result<(SystemRef, JoinHandle<()>), SystemError> {
64    // Create de actor system.
65    let (system, mut runner) =
66        ActorSystem::create(graceful_token.clone(), crash_token.clone());
67
68    system
69        .add_helper("config", ConfigHelper::from(config.clone()))
70        .await;
71
72    // Build engine + limits together; actors fetch both via a single helper access.
73    let wasm_runtime = WasmRuntime::new(config.spec.clone())
74        .map_err(|e| SystemError::EngineCreation(e.to_string()))?;
75    system
76        .add_helper("wasm_runtime", Arc::new(wasm_runtime))
77        .await;
78
79    let contracts: HashMap<String, Arc<Module>> = HashMap::new();
80    system
81        .add_helper("contracts", Arc::new(RwLock::new(contracts)))
82        .await;
83
84    let actor_spec = config.spec.clone().map(MachineSpec::from);
85
86    // Build database manager.
87    let mut db = Database::open(&config.internal_db, actor_spec)
88        .map_err(|e| SystemError::DatabaseOpen(e.to_string()))?;
89    system.add_helper("store", db.clone()).await;
90
91    // Build sink manager.
92    let api_key = if sink_auth.api_key.is_empty() {
93        None
94    } else {
95        Some(sink_auth.api_key.clone())
96    };
97    let ave_sink = AveSink::new(
98        sink_auth.sink.sinks,
99        sink_auth.token,
100        &sink_auth.sink.auth,
101        &sink_auth.sink.username,
102        &sink_auth.password,
103        api_key,
104    );
105    system.add_helper("sink", ave_sink).await;
106
107    let pass_hash =
108        hash_borsh(&*config.hash_algorithm.hasher(), &password.to_string())
109            .map_err(|e| SystemError::PasswordHash(e.to_string()))?;
110
111    let array_hash: [u8; 32] = pass_hash
112        .hash_array()
113        .map_err(|e| SystemError::HashArrayConversion(e.to_string()))?;
114
115    // Helper memory encryption for passwords to be used in secure stores.
116    let encrypted_key = EncryptedKey::new(&array_hash)
117        .map_err(|e| SystemError::EncryptedKeyCreation(e.to_string()))?;
118
119    system.add_helper("encrypted_key", encrypted_key).await;
120
121    let db_manager_actor = system
122        .create_root_actor("db_manager", DBManager)
123        .await
124        .map_err(|e| SystemError::RootActorCreation(e.to_string()))?;
125
126    let ext_db = Arc::new(
127        ExternalDB::build(
128            config.external_db.db,
129            config.external_db.durability,
130            db_manager_actor,
131            config.spec.clone(),
132        )
133        .await
134        .map_err(|e| SystemError::ExternalDbBuild(e.to_string()))?,
135    );
136
137    system.add_helper("ext_db", Arc::clone(&ext_db)).await;
138
139    let runner = tokio::spawn(async move {
140        runner.run().await;
141        if let Err(e) = ext_db.shutdown().await {
142            error!(error = %e, "Failed to stop external db");
143        };
144        if let Err(e) = db.stop() {
145            error!(error = %e, "Failed to stop db");
146        };
147    });
148
149    Ok((system, runner))
150}
151
152#[cfg(test)]
153pub mod tests {
154
155    use std::{
156        env, process,
157        sync::atomic::{AtomicU64, Ordering},
158    };
159
160    use ave_common::identity::{HashAlgorithm, KeyPairAlgorithm};
161    use ave_network::Config as NetworkConfig;
162    use tempfile::TempDir;
163    use test_log::test;
164
165    use crate::config::{
166        AveExternalDBConfig, AveExternalDBFeatureConfig, AveInternalDBConfig,
167        AveInternalDBFeatureConfig, SyncConfig,
168    };
169
170    use super::*;
171
172    static CONTRACTS_COUNTER: AtomicU64 = AtomicU64::new(0);
173
174    fn create_contracts_temp_dir() -> TempDir {
175        tempfile::Builder::new()
176            .prefix(&format!(
177                "ave-test-contracts-{}-{}-",
178                process::id(),
179                CONTRACTS_COUNTER.fetch_add(1, Ordering::SeqCst)
180            ))
181            .tempdir_in(env::temp_dir())
182            .expect("Can not create temporal directory")
183    }
184
185    #[derive(Debug, Clone)]
186    pub struct Dummy;
187
188    #[test(tokio::test)]
189    async fn test_system() {
190        let (system, _runner, _dirs) = create_system().await;
191        let db: Option<Database> = system.get_helper("store").await;
192        assert!(db.is_some());
193        let ep: Option<EncryptedKey> = system.get_helper("encrypted_key").await;
194        assert!(ep.is_some());
195        let any: Option<Dummy> = system.get_helper("dummy").await;
196        assert!(any.is_none());
197    }
198
199    pub async fn create_system() -> (SystemRef, JoinHandle<()>, Vec<TempDir>) {
200        let mut vec_dirs = vec![];
201
202        let dir_ave_db =
203            tempfile::tempdir().expect("Can not create temporal directory");
204        let ave_path = dir_ave_db.path().to_path_buf();
205        vec_dirs.push(dir_ave_db);
206
207        let dir_ext_db =
208            tempfile::tempdir().expect("Can not create temporal directory");
209        let ext_path = dir_ext_db.path().to_path_buf();
210        vec_dirs.push(dir_ext_db);
211
212        let dir_contracts = create_contracts_temp_dir();
213        let contracts_path = dir_contracts.path().to_path_buf();
214        vec_dirs.push(dir_contracts);
215
216        let newtork_config = NetworkConfig::new(
217            ave_network::NodeType::Bootstrap,
218            vec![],
219            vec![],
220            vec![],
221        );
222        let config = Config {
223            keypair_algorithm: KeyPairAlgorithm::Ed25519,
224            hash_algorithm: HashAlgorithm::Blake3,
225            internal_db: AveInternalDBConfig {
226                db: AveInternalDBFeatureConfig::build(&ave_path),
227                ..Default::default()
228            },
229            external_db: AveExternalDBConfig {
230                db: AveExternalDBFeatureConfig::build(&ext_path),
231                ..Default::default()
232            },
233            network: newtork_config,
234            contracts_path: contracts_path,
235            always_accept: false,
236            tracking_size: 100,
237            safe_mode: false,
238            is_service: true,
239            only_clear_events: false,
240            sync: SyncConfig {
241                governance: GovernanceSyncConfig {
242                    interval_secs: 60,
243                    sample_size: 3,
244                    response_timeout_secs: 30,
245                },
246                tracker: TrackerSyncConfig {
247                    interval_secs: 60,
248                    page_size: 200,
249                    response_timeout_secs: 10,
250                    update_batch_size: 2,
251                    update_timeout_secs: 10,
252                },
253                update: UpdateSyncConfig::default(),
254                reboot: RebootSyncConfig::default(),
255                ledger_batch_size: 100,
256            },
257            spec: None,
258        };
259
260        let (sys, handlers) = system(
261            config.clone(),
262            SinkAuth::default(),
263            "password",
264            CancellationToken::new(),
265            CancellationToken::new(),
266        )
267        .await
268        .unwrap();
269
270        (sys, handlers, vec_dirs)
271    }
272}