Skip to main content

ave_core/system/
mod.rs

1pub use error::SystemError;
2use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use crate::{
5    config::{Config, SinkAuth},
6    db::Database,
7    external_db::DBManager,
8    helpers::{db::ExternalDB, sink::AveSink},
9    model::common::contract::WasmRuntime,
10};
11use ave_actors::{
12    ActorSystem, DbManager, EncryptedKey, MachineSpec, SystemRef,
13};
14use ave_common::identity::hash_borsh;
15use serde::{Deserialize, Serialize};
16use tokio::{sync::RwLock, task::JoinHandle};
17use tokio_util::sync::CancellationToken;
18use tracing::error;
19
20pub mod error;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ConfigHelper {
24    pub contracts_path: PathBuf,
25    pub always_accept: bool,
26    pub tracking_size: usize,
27}
28
29impl From<Config> for ConfigHelper {
30    fn from(value: Config) -> Self {
31        Self {
32            contracts_path: value.contracts_path,
33            always_accept: value.always_accept,
34            tracking_size: value.tracking_size,
35        }
36    }
37}
38
39pub async fn system(
40    config: Config,
41    sink_auth: SinkAuth,
42    password: &str,
43    graceful_token: CancellationToken,
44    crash_token: CancellationToken,
45) -> Result<(SystemRef, JoinHandle<()>), SystemError> {
46    // Create de actor system.
47    let (system, mut runner) =
48        ActorSystem::create(graceful_token.clone(), crash_token.clone());
49
50    system
51        .add_helper("config", ConfigHelper::from(config.clone()))
52        .await;
53
54    // Build engine + limits together; actors fetch both via a single helper access.
55    let wasm_runtime = WasmRuntime::new(config.spec.clone())
56        .map_err(|e| SystemError::EngineCreation(e.to_string()))?;
57    system
58        .add_helper("wasm_runtime", Arc::new(wasm_runtime))
59        .await;
60
61    let contracts: HashMap<String, Vec<u8>> = HashMap::new();
62    system
63        .add_helper("contracts", Arc::new(RwLock::new(contracts)))
64        .await;
65
66    let actor_spec = config.spec.clone().map(MachineSpec::from);
67
68    // Build database manager.
69    let mut db = Database::open(&config.internal_db, actor_spec)
70        .map_err(|e| SystemError::DatabaseOpen(e.to_string()))?;
71    system.add_helper("store", db.clone()).await;
72
73    // Build sink manager.
74    let api_key = if sink_auth.api_key.is_empty() {
75        None
76    } else {
77        Some(sink_auth.api_key.clone())
78    };
79    let ave_sink = AveSink::new(
80        sink_auth.sink.sinks,
81        sink_auth.token,
82        &sink_auth.sink.auth,
83        &sink_auth.sink.username,
84        &sink_auth.password,
85        api_key,
86    );
87    system.add_helper("sink", ave_sink).await;
88
89    let pass_hash =
90        hash_borsh(&*config.hash_algorithm.hasher(), &password.to_string())
91            .map_err(|e| SystemError::PasswordHash(e.to_string()))?;
92
93    let array_hash: [u8; 32] = pass_hash
94        .hash_array()
95        .map_err(|e| SystemError::HashArrayConversion(e.to_string()))?;
96
97    // Helper memory encryption for passwords to be used in secure stores.
98    let encrypted_key = EncryptedKey::new(&array_hash)
99        .map_err(|e| SystemError::EncryptedKeyCreation(e.to_string()))?;
100
101    system.add_helper("encrypted_key", encrypted_key).await;
102
103    let db_manager_actor = system
104        .create_root_actor("db_manager", DBManager)
105        .await
106        .map_err(|e| SystemError::RootActorCreation(e.to_string()))?;
107
108    let ext_db = Arc::new(
109        ExternalDB::build(
110        config.external_db.db,
111        config.external_db.durability,
112        db_manager_actor,
113        config.spec.clone(),
114    )
115    .await
116    .map_err(|e| SystemError::ExternalDbBuild(e.to_string()))?,
117    );
118
119    system.add_helper("ext_db", Arc::clone(&ext_db)).await;
120
121    let runner = tokio::spawn(async move {
122        runner.run().await;
123        if let Err(e) = ext_db.shutdown().await {
124            error!(error = %e, "Failed to stop external db");
125        };
126        if let Err(e) = db.stop() {
127            error!(error = %e, "Failed to stop db");
128        };
129    });
130
131    Ok((system, runner))
132}
133
134#[cfg(test)]
135pub mod tests {
136
137    use ave_common::identity::{HashAlgorithm, KeyPairAlgorithm};
138    use network::Config as NetworkConfig;
139    use tempfile::TempDir;
140    use test_log::test;
141
142    use crate::config::{
143        AveExternalDBConfig, AveExternalDBFeatureConfig, AveInternalDBConfig,
144        AveInternalDBFeatureConfig,
145    };
146
147    use super::*;
148
149    #[derive(Debug, Clone)]
150    pub struct Dummy;
151
152    #[test(tokio::test)]
153    async fn test_system() {
154        let (system, _runner, _dirs) = create_system().await;
155        let db: Option<Database> = system.get_helper("store").await;
156        assert!(db.is_some());
157        let ep: Option<EncryptedKey> = system.get_helper("encrypted_key").await;
158        assert!(ep.is_some());
159        let any: Option<Dummy> = system.get_helper("dummy").await;
160        assert!(any.is_none());
161    }
162
163    pub async fn create_system() -> (SystemRef, JoinHandle<()>, Vec<TempDir>) {
164        let mut vec_dirs = vec![];
165
166        let dir_ave_db =
167            tempfile::tempdir().expect("Can not create temporal directory");
168        let ave_path = dir_ave_db.path().to_path_buf();
169        vec_dirs.push(dir_ave_db);
170
171        let dir_ext_db =
172            tempfile::tempdir().expect("Can not create temporal directory");
173        let ext_path = dir_ext_db.path().to_path_buf();
174        vec_dirs.push(dir_ext_db);
175
176        let dir_contracts =
177            tempfile::tempdir().expect("Can not create temporal directory");
178        let contracts_path = dir_contracts.path().to_path_buf();
179        vec_dirs.push(dir_contracts);
180
181        let newtork_config = NetworkConfig::new(
182            network::NodeType::Bootstrap,
183            vec![],
184            vec![],
185            vec![],
186        );
187        let config = Config {
188            keypair_algorithm: KeyPairAlgorithm::Ed25519,
189            hash_algorithm: HashAlgorithm::Blake3,
190            internal_db: AveInternalDBConfig {
191                db: AveInternalDBFeatureConfig::build(&ave_path),
192                ..Default::default()
193            },
194            external_db: AveExternalDBConfig {
195                db: AveExternalDBFeatureConfig::build(&ext_path),
196                ..Default::default()
197            },
198            network: newtork_config,
199            contracts_path: contracts_path,
200            always_accept: false,
201            tracking_size: 100,
202            is_service: true,
203            spec: None,
204        };
205
206        let (sys, handlers) = system(
207            config.clone(),
208            SinkAuth::default(),
209            "password",
210            CancellationToken::new(),
211            CancellationToken::new(),
212        )
213        .await
214        .unwrap();
215
216        (sys, handlers, vec_dirs)
217    }
218}