1pub use error::SystemError;
2use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use crate::{
5 config::{
6 Config, GovernanceSyncConfig, RebootSyncConfig, SinkAuth,
7 TrackerSyncConfig, UpdateSyncConfig,
8 },
9 db::Database,
10 external_db::DBManager,
11 helpers::{db::ExternalDB, sink::AveSink},
12 model::common::contract::WasmRuntime,
13};
14use ave_actors::{
15 ActorSystem, DbManager, EncryptedKey, MachineSpec, SystemRef,
16};
17use ave_common::identity::hash_borsh;
18use serde::{Deserialize, Serialize};
19use tokio::{sync::RwLock, task::JoinHandle};
20use tokio_util::sync::CancellationToken;
21use tracing::error;
22use wasmtime::Module;
23
24pub mod error;
25
26#[derive(Debug, Clone, Serialize, Deserialize)]
27pub struct ConfigHelper {
28 pub contracts_path: PathBuf,
29 pub always_accept: bool,
30 pub safe_mode: bool,
31 pub tracking_size: usize,
32 pub only_clear_events: bool,
33 pub ledger_batch_size: usize,
34 pub sync_governance: GovernanceSyncConfig,
35 pub sync_tracker: TrackerSyncConfig,
36 pub sync_update: UpdateSyncConfig,
37 pub sync_reboot: RebootSyncConfig,
38}
39
40impl From<Config> for ConfigHelper {
41 fn from(value: Config) -> Self {
42 Self {
43 contracts_path: value.contracts_path,
44 always_accept: value.always_accept,
45 safe_mode: value.safe_mode,
46 tracking_size: value.tracking_size,
47 only_clear_events: value.only_clear_events,
48 ledger_batch_size: value.sync.ledger_batch_size,
49 sync_governance: value.sync.governance,
50 sync_tracker: value.sync.tracker,
51 sync_update: value.sync.update,
52 sync_reboot: value.sync.reboot,
53 }
54 }
55}
56
57pub async fn system(
58 config: Config,
59 sink_auth: SinkAuth,
60 password: &str,
61 graceful_token: CancellationToken,
62 crash_token: CancellationToken,
63) -> Result<(SystemRef, JoinHandle<()>), SystemError> {
64 let (system, mut runner) =
66 ActorSystem::create(graceful_token.clone(), crash_token.clone());
67
68 system
69 .add_helper("config", ConfigHelper::from(config.clone()))
70 .await;
71
72 let wasm_runtime = WasmRuntime::new(config.spec.clone())
74 .map_err(|e| SystemError::EngineCreation(e.to_string()))?;
75 system
76 .add_helper("wasm_runtime", Arc::new(wasm_runtime))
77 .await;
78
79 let contracts: HashMap<String, Arc<Module>> = HashMap::new();
80 system
81 .add_helper("contracts", Arc::new(RwLock::new(contracts)))
82 .await;
83
84 let actor_spec = config.spec.clone().map(MachineSpec::from);
85
86 let mut db = Database::open(&config.internal_db, actor_spec)
88 .map_err(|e| SystemError::DatabaseOpen(e.to_string()))?;
89 system.add_helper("store", db.clone()).await;
90
91 let api_key = if sink_auth.api_key.is_empty() {
93 None
94 } else {
95 Some(sink_auth.api_key.clone())
96 };
97 let ave_sink = AveSink::new(
98 sink_auth.sink.sinks,
99 sink_auth.token,
100 &sink_auth.sink.auth,
101 &sink_auth.sink.username,
102 &sink_auth.password,
103 api_key,
104 );
105 system.add_helper("sink", ave_sink).await;
106
107 let pass_hash =
108 hash_borsh(&*config.hash_algorithm.hasher(), &password.to_string())
109 .map_err(|e| SystemError::PasswordHash(e.to_string()))?;
110
111 let array_hash: [u8; 32] = pass_hash
112 .hash_array()
113 .map_err(|e| SystemError::HashArrayConversion(e.to_string()))?;
114
115 let encrypted_key = EncryptedKey::new(&array_hash)
117 .map_err(|e| SystemError::EncryptedKeyCreation(e.to_string()))?;
118
119 system.add_helper("encrypted_key", encrypted_key).await;
120
121 let db_manager_actor = system
122 .create_root_actor("db_manager", DBManager)
123 .await
124 .map_err(|e| SystemError::RootActorCreation(e.to_string()))?;
125
126 let ext_db = Arc::new(
127 ExternalDB::build(
128 config.external_db.db,
129 config.external_db.durability,
130 db_manager_actor,
131 config.spec.clone(),
132 )
133 .await
134 .map_err(|e| SystemError::ExternalDbBuild(e.to_string()))?,
135 );
136
137 system.add_helper("ext_db", Arc::clone(&ext_db)).await;
138
139 let runner = tokio::spawn(async move {
140 runner.run().await;
141 if let Err(e) = ext_db.shutdown().await {
142 error!(error = %e, "Failed to stop external db");
143 };
144 if let Err(e) = db.stop() {
145 error!(error = %e, "Failed to stop db");
146 };
147 });
148
149 Ok((system, runner))
150}
151
152#[cfg(test)]
153pub mod tests {
154
155 use std::{
156 env, process,
157 sync::atomic::{AtomicU64, Ordering},
158 };
159
160 use ave_common::identity::{HashAlgorithm, KeyPairAlgorithm};
161 use ave_network::Config as NetworkConfig;
162 use tempfile::TempDir;
163 use test_log::test;
164
165 use crate::config::{
166 AveExternalDBConfig, AveExternalDBFeatureConfig, AveInternalDBConfig,
167 AveInternalDBFeatureConfig, SyncConfig,
168 };
169
170 use super::*;
171
172 static CONTRACTS_COUNTER: AtomicU64 = AtomicU64::new(0);
173
174 fn create_contracts_temp_dir() -> TempDir {
175 tempfile::Builder::new()
176 .prefix(&format!(
177 "ave-test-contracts-{}-{}-",
178 process::id(),
179 CONTRACTS_COUNTER.fetch_add(1, Ordering::SeqCst)
180 ))
181 .tempdir_in(env::temp_dir())
182 .expect("Can not create temporal directory")
183 }
184
185 #[derive(Debug, Clone)]
186 pub struct Dummy;
187
188 #[test(tokio::test)]
189 async fn test_system() {
190 let (system, _runner, _dirs) = create_system().await;
191 let db: Option<Database> = system.get_helper("store").await;
192 assert!(db.is_some());
193 let ep: Option<EncryptedKey> = system.get_helper("encrypted_key").await;
194 assert!(ep.is_some());
195 let any: Option<Dummy> = system.get_helper("dummy").await;
196 assert!(any.is_none());
197 }
198
199 pub async fn create_system() -> (SystemRef, JoinHandle<()>, Vec<TempDir>) {
200 let mut vec_dirs = vec![];
201
202 let dir_ave_db =
203 tempfile::tempdir().expect("Can not create temporal directory");
204 let ave_path = dir_ave_db.path().to_path_buf();
205 vec_dirs.push(dir_ave_db);
206
207 let dir_ext_db =
208 tempfile::tempdir().expect("Can not create temporal directory");
209 let ext_path = dir_ext_db.path().to_path_buf();
210 vec_dirs.push(dir_ext_db);
211
212 let dir_contracts = create_contracts_temp_dir();
213 let contracts_path = dir_contracts.path().to_path_buf();
214 vec_dirs.push(dir_contracts);
215
216 let newtork_config = NetworkConfig::new(
217 ave_network::NodeType::Bootstrap,
218 vec![],
219 vec![],
220 vec![],
221 );
222 let config = Config {
223 keypair_algorithm: KeyPairAlgorithm::Ed25519,
224 hash_algorithm: HashAlgorithm::Blake3,
225 internal_db: AveInternalDBConfig {
226 db: AveInternalDBFeatureConfig::build(&ave_path),
227 ..Default::default()
228 },
229 external_db: AveExternalDBConfig {
230 db: AveExternalDBFeatureConfig::build(&ext_path),
231 ..Default::default()
232 },
233 network: newtork_config,
234 contracts_path: contracts_path,
235 always_accept: false,
236 tracking_size: 100,
237 safe_mode: false,
238 is_service: true,
239 only_clear_events: false,
240 sync: SyncConfig {
241 governance: GovernanceSyncConfig {
242 interval_secs: 60,
243 sample_size: 3,
244 response_timeout_secs: 30,
245 },
246 tracker: TrackerSyncConfig {
247 interval_secs: 60,
248 page_size: 200,
249 response_timeout_secs: 10,
250 update_batch_size: 2,
251 update_timeout_secs: 10,
252 },
253 update: UpdateSyncConfig::default(),
254 reboot: RebootSyncConfig::default(),
255 ledger_batch_size: 100,
256 },
257 spec: None,
258 };
259
260 let (sys, handlers) = system(
261 config.clone(),
262 SinkAuth::default(),
263 "password",
264 CancellationToken::new(),
265 CancellationToken::new(),
266 )
267 .await
268 .unwrap();
269
270 (sys, handlers, vec_dirs)
271 }
272}