1pub use error::SystemError;
2use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use crate::{
5 config::{Config, GovernanceSyncConfig, SinkAuth, TrackerSyncConfig},
6 db::Database,
7 external_db::DBManager,
8 helpers::{db::ExternalDB, sink::AveSink},
9 model::common::contract::WasmRuntime,
10};
11use ave_actors::{
12 ActorSystem, DbManager, EncryptedKey, MachineSpec, SystemRef,
13};
14use ave_common::identity::hash_borsh;
15use serde::{Deserialize, Serialize};
16use tokio::{sync::RwLock, task::JoinHandle};
17use tokio_util::sync::CancellationToken;
18use tracing::error;
19use wasmtime::Module;
20
21pub mod error;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
24pub struct ConfigHelper {
25 pub contracts_path: PathBuf,
26 pub always_accept: bool,
27 pub safe_mode: bool,
28 pub tracking_size: usize,
29 pub sync_governance: GovernanceSyncConfig,
30 pub sync_tracker: TrackerSyncConfig,
31}
32
33impl From<Config> for ConfigHelper {
34 fn from(value: Config) -> Self {
35 Self {
36 contracts_path: value.contracts_path,
37 always_accept: value.always_accept,
38 safe_mode: value.safe_mode,
39 tracking_size: value.tracking_size,
40 sync_governance: value.sync.governance,
41 sync_tracker: value.sync.tracker,
42 }
43 }
44}
45
46pub async fn system(
47 config: Config,
48 sink_auth: SinkAuth,
49 password: &str,
50 graceful_token: CancellationToken,
51 crash_token: CancellationToken,
52) -> Result<(SystemRef, JoinHandle<()>), SystemError> {
53 let (system, mut runner) =
55 ActorSystem::create(graceful_token.clone(), crash_token.clone());
56
57 system
58 .add_helper("config", ConfigHelper::from(config.clone()))
59 .await;
60
61 let wasm_runtime = WasmRuntime::new(config.spec.clone())
63 .map_err(|e| SystemError::EngineCreation(e.to_string()))?;
64 system
65 .add_helper("wasm_runtime", Arc::new(wasm_runtime))
66 .await;
67
68 let contracts: HashMap<String, Arc<Module>> = HashMap::new();
69 system
70 .add_helper("contracts", Arc::new(RwLock::new(contracts)))
71 .await;
72
73 let actor_spec = config.spec.clone().map(MachineSpec::from);
74
75 let mut db = Database::open(&config.internal_db, actor_spec)
77 .map_err(|e| SystemError::DatabaseOpen(e.to_string()))?;
78 system.add_helper("store", db.clone()).await;
79
80 let api_key = if sink_auth.api_key.is_empty() {
82 None
83 } else {
84 Some(sink_auth.api_key.clone())
85 };
86 let ave_sink = AveSink::new(
87 sink_auth.sink.sinks,
88 sink_auth.token,
89 &sink_auth.sink.auth,
90 &sink_auth.sink.username,
91 &sink_auth.password,
92 api_key,
93 );
94 system.add_helper("sink", ave_sink).await;
95
96 let pass_hash =
97 hash_borsh(&*config.hash_algorithm.hasher(), &password.to_string())
98 .map_err(|e| SystemError::PasswordHash(e.to_string()))?;
99
100 let array_hash: [u8; 32] = pass_hash
101 .hash_array()
102 .map_err(|e| SystemError::HashArrayConversion(e.to_string()))?;
103
104 let encrypted_key = EncryptedKey::new(&array_hash)
106 .map_err(|e| SystemError::EncryptedKeyCreation(e.to_string()))?;
107
108 system.add_helper("encrypted_key", encrypted_key).await;
109
110 let db_manager_actor = system
111 .create_root_actor("db_manager", DBManager)
112 .await
113 .map_err(|e| SystemError::RootActorCreation(e.to_string()))?;
114
115 let ext_db = Arc::new(
116 ExternalDB::build(
117 config.external_db.db,
118 config.external_db.durability,
119 db_manager_actor,
120 config.spec.clone(),
121 )
122 .await
123 .map_err(|e| SystemError::ExternalDbBuild(e.to_string()))?,
124 );
125
126 system.add_helper("ext_db", Arc::clone(&ext_db)).await;
127
128 let runner = tokio::spawn(async move {
129 runner.run().await;
130 if let Err(e) = ext_db.shutdown().await {
131 error!(error = %e, "Failed to stop external db");
132 };
133 if let Err(e) = db.stop() {
134 error!(error = %e, "Failed to stop db");
135 };
136 });
137
138 Ok((system, runner))
139}
140
141#[cfg(test)]
142pub mod tests {
143
144 use ave_common::identity::{HashAlgorithm, KeyPairAlgorithm};
145 use network::Config as NetworkConfig;
146 use tempfile::TempDir;
147 use test_log::test;
148
149 use crate::config::{
150 AveExternalDBConfig, AveExternalDBFeatureConfig, AveInternalDBConfig,
151 AveInternalDBFeatureConfig, SyncConfig,
152 };
153
154 use super::*;
155
156 #[derive(Debug, Clone)]
157 pub struct Dummy;
158
159 #[test(tokio::test)]
160 async fn test_system() {
161 let (system, _runner, _dirs) = create_system().await;
162 let db: Option<Database> = system.get_helper("store").await;
163 assert!(db.is_some());
164 let ep: Option<EncryptedKey> = system.get_helper("encrypted_key").await;
165 assert!(ep.is_some());
166 let any: Option<Dummy> = system.get_helper("dummy").await;
167 assert!(any.is_none());
168 }
169
170 pub async fn create_system() -> (SystemRef, JoinHandle<()>, Vec<TempDir>) {
171 let mut vec_dirs = vec![];
172
173 let dir_ave_db =
174 tempfile::tempdir().expect("Can not create temporal directory");
175 let ave_path = dir_ave_db.path().to_path_buf();
176 vec_dirs.push(dir_ave_db);
177
178 let dir_ext_db =
179 tempfile::tempdir().expect("Can not create temporal directory");
180 let ext_path = dir_ext_db.path().to_path_buf();
181 vec_dirs.push(dir_ext_db);
182
183 let dir_contracts =
184 tempfile::tempdir().expect("Can not create temporal directory");
185 let contracts_path = dir_contracts.path().to_path_buf();
186 vec_dirs.push(dir_contracts);
187
188 let newtork_config = NetworkConfig::new(
189 network::NodeType::Bootstrap,
190 vec![],
191 vec![],
192 vec![],
193 );
194 let config = Config {
195 keypair_algorithm: KeyPairAlgorithm::Ed25519,
196 hash_algorithm: HashAlgorithm::Blake3,
197 internal_db: AveInternalDBConfig {
198 db: AveInternalDBFeatureConfig::build(&ave_path),
199 ..Default::default()
200 },
201 external_db: AveExternalDBConfig {
202 db: AveExternalDBFeatureConfig::build(&ext_path),
203 ..Default::default()
204 },
205 network: newtork_config,
206 contracts_path: contracts_path,
207 always_accept: false,
208 tracking_size: 100,
209 safe_mode: false,
210 is_service: true,
211 sync: SyncConfig {
212 governance: GovernanceSyncConfig {
213 interval_secs: 60,
214 sample_size: 3,
215 response_timeout_secs: 30,
216 },
217 tracker: TrackerSyncConfig {
218 interval_secs: 60,
219 page_size: 200,
220 response_timeout_secs: 10,
221 update_batch_size: 2,
222 update_timeout_secs: 10,
223 },
224 },
225 spec: None,
226 };
227
228 let (sys, handlers) = system(
229 config.clone(),
230 SinkAuth::default(),
231 "password",
232 CancellationToken::new(),
233 CancellationToken::new(),
234 )
235 .await
236 .unwrap();
237
238 (sys, handlers, vec_dirs)
239 }
240}