Skip to main content

ave_core/system/
mod.rs

1pub use error::SystemError;
2use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use crate::{
5    config::{Config, GovernanceSyncConfig, SinkAuth, TrackerSyncConfig},
6    db::Database,
7    external_db::DBManager,
8    helpers::{db::ExternalDB, sink::AveSink},
9    model::common::contract::WasmRuntime,
10};
11use ave_actors::{
12    ActorSystem, DbManager, EncryptedKey, MachineSpec, SystemRef,
13};
14use ave_common::identity::hash_borsh;
15use serde::{Deserialize, Serialize};
16use tokio::{sync::RwLock, task::JoinHandle};
17use tokio_util::sync::CancellationToken;
18use tracing::error;
19use wasmtime::Module;
20
21pub mod error;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ConfigHelper {
25    pub contracts_path: PathBuf,
26    pub always_accept: bool,
27    pub safe_mode: bool,
28    pub tracking_size: usize,
29    pub sync_governance: GovernanceSyncConfig,
30    pub sync_tracker: TrackerSyncConfig,
31}
32
33impl From<Config> for ConfigHelper {
34    fn from(value: Config) -> Self {
35        Self {
36            contracts_path: value.contracts_path,
37            always_accept: value.always_accept,
38            safe_mode: value.safe_mode,
39            tracking_size: value.tracking_size,
40            sync_governance: value.sync.governance,
41            sync_tracker: value.sync.tracker,
42        }
43    }
44}
45
46pub async fn system(
47    config: Config,
48    sink_auth: SinkAuth,
49    password: &str,
50    graceful_token: CancellationToken,
51    crash_token: CancellationToken,
52) -> Result<(SystemRef, JoinHandle<()>), SystemError> {
53    // Create de actor system.
54    let (system, mut runner) =
55        ActorSystem::create(graceful_token.clone(), crash_token.clone());
56
57    system
58        .add_helper("config", ConfigHelper::from(config.clone()))
59        .await;
60
61    // Build engine + limits together; actors fetch both via a single helper access.
62    let wasm_runtime = WasmRuntime::new(config.spec.clone())
63        .map_err(|e| SystemError::EngineCreation(e.to_string()))?;
64    system
65        .add_helper("wasm_runtime", Arc::new(wasm_runtime))
66        .await;
67
68    let contracts: HashMap<String, Arc<Module>> = HashMap::new();
69    system
70        .add_helper("contracts", Arc::new(RwLock::new(contracts)))
71        .await;
72
73    let actor_spec = config.spec.clone().map(MachineSpec::from);
74
75    // Build database manager.
76    let mut db = Database::open(&config.internal_db, actor_spec)
77        .map_err(|e| SystemError::DatabaseOpen(e.to_string()))?;
78    system.add_helper("store", db.clone()).await;
79
80    // Build sink manager.
81    let api_key = if sink_auth.api_key.is_empty() {
82        None
83    } else {
84        Some(sink_auth.api_key.clone())
85    };
86    let ave_sink = AveSink::new(
87        sink_auth.sink.sinks,
88        sink_auth.token,
89        &sink_auth.sink.auth,
90        &sink_auth.sink.username,
91        &sink_auth.password,
92        api_key,
93    );
94    system.add_helper("sink", ave_sink).await;
95
96    let pass_hash =
97        hash_borsh(&*config.hash_algorithm.hasher(), &password.to_string())
98            .map_err(|e| SystemError::PasswordHash(e.to_string()))?;
99
100    let array_hash: [u8; 32] = pass_hash
101        .hash_array()
102        .map_err(|e| SystemError::HashArrayConversion(e.to_string()))?;
103
104    // Helper memory encryption for passwords to be used in secure stores.
105    let encrypted_key = EncryptedKey::new(&array_hash)
106        .map_err(|e| SystemError::EncryptedKeyCreation(e.to_string()))?;
107
108    system.add_helper("encrypted_key", encrypted_key).await;
109
110    let db_manager_actor = system
111        .create_root_actor("db_manager", DBManager)
112        .await
113        .map_err(|e| SystemError::RootActorCreation(e.to_string()))?;
114
115    let ext_db = Arc::new(
116        ExternalDB::build(
117            config.external_db.db,
118            config.external_db.durability,
119            db_manager_actor,
120            config.spec.clone(),
121        )
122        .await
123        .map_err(|e| SystemError::ExternalDbBuild(e.to_string()))?,
124    );
125
126    system.add_helper("ext_db", Arc::clone(&ext_db)).await;
127
128    let runner = tokio::spawn(async move {
129        runner.run().await;
130        if let Err(e) = ext_db.shutdown().await {
131            error!(error = %e, "Failed to stop external db");
132        };
133        if let Err(e) = db.stop() {
134            error!(error = %e, "Failed to stop db");
135        };
136    });
137
138    Ok((system, runner))
139}
140
141#[cfg(test)]
142pub mod tests {
143
144    use ave_common::identity::{HashAlgorithm, KeyPairAlgorithm};
145    use network::Config as NetworkConfig;
146    use tempfile::TempDir;
147    use test_log::test;
148
149    use crate::config::{
150        AveExternalDBConfig, AveExternalDBFeatureConfig, AveInternalDBConfig,
151        AveInternalDBFeatureConfig, SyncConfig,
152    };
153
154    use super::*;
155
156    #[derive(Debug, Clone)]
157    pub struct Dummy;
158
159    #[test(tokio::test)]
160    async fn test_system() {
161        let (system, _runner, _dirs) = create_system().await;
162        let db: Option<Database> = system.get_helper("store").await;
163        assert!(db.is_some());
164        let ep: Option<EncryptedKey> = system.get_helper("encrypted_key").await;
165        assert!(ep.is_some());
166        let any: Option<Dummy> = system.get_helper("dummy").await;
167        assert!(any.is_none());
168    }
169
170    pub async fn create_system() -> (SystemRef, JoinHandle<()>, Vec<TempDir>) {
171        let mut vec_dirs = vec![];
172
173        let dir_ave_db =
174            tempfile::tempdir().expect("Can not create temporal directory");
175        let ave_path = dir_ave_db.path().to_path_buf();
176        vec_dirs.push(dir_ave_db);
177
178        let dir_ext_db =
179            tempfile::tempdir().expect("Can not create temporal directory");
180        let ext_path = dir_ext_db.path().to_path_buf();
181        vec_dirs.push(dir_ext_db);
182
183        let dir_contracts =
184            tempfile::tempdir().expect("Can not create temporal directory");
185        let contracts_path = dir_contracts.path().to_path_buf();
186        vec_dirs.push(dir_contracts);
187
188        let newtork_config = NetworkConfig::new(
189            network::NodeType::Bootstrap,
190            vec![],
191            vec![],
192            vec![],
193        );
194        let config = Config {
195            keypair_algorithm: KeyPairAlgorithm::Ed25519,
196            hash_algorithm: HashAlgorithm::Blake3,
197            internal_db: AveInternalDBConfig {
198                db: AveInternalDBFeatureConfig::build(&ave_path),
199                ..Default::default()
200            },
201            external_db: AveExternalDBConfig {
202                db: AveExternalDBFeatureConfig::build(&ext_path),
203                ..Default::default()
204            },
205            network: newtork_config,
206            contracts_path: contracts_path,
207            always_accept: false,
208            tracking_size: 100,
209            safe_mode: false,
210            is_service: true,
211            sync: SyncConfig {
212                governance: GovernanceSyncConfig {
213                    interval_secs: 60,
214                    sample_size: 3,
215                    response_timeout_secs: 30,
216                },
217                tracker: TrackerSyncConfig {
218                    interval_secs: 60,
219                    page_size: 200,
220                    response_timeout_secs: 10,
221                    update_batch_size: 2,
222                    update_timeout_secs: 10,
223                },
224            },
225            spec: None,
226        };
227
228        let (sys, handlers) = system(
229            config.clone(),
230            SinkAuth::default(),
231            "password",
232            CancellationToken::new(),
233            CancellationToken::new(),
234        )
235        .await
236        .unwrap();
237
238        (sys, handlers, vec_dirs)
239    }
240}