1pub use error::SystemError;
2use std::{collections::HashMap, path::PathBuf, sync::Arc};
3
4use crate::{
5 config::{Config, SinkAuth},
6 db::Database,
7 external_db::DBManager,
8 helpers::{db::ExternalDB, sink::AveSink},
9 model::common::contract::WasmRuntime,
10};
11use ave_actors::{
12 ActorSystem, DbManager, EncryptedKey, MachineSpec, SystemRef,
13};
14use ave_common::identity::hash_borsh;
15use serde::{Deserialize, Serialize};
16use tokio::{sync::RwLock, task::JoinHandle};
17use tokio_util::sync::CancellationToken;
18use tracing::error;
19
20pub mod error;
21
22#[derive(Debug, Clone, Serialize, Deserialize)]
23pub struct ConfigHelper {
24 pub contracts_path: PathBuf,
25 pub always_accept: bool,
26 pub tracking_size: usize,
27}
28
29impl From<Config> for ConfigHelper {
30 fn from(value: Config) -> Self {
31 Self {
32 contracts_path: value.contracts_path,
33 always_accept: value.always_accept,
34 tracking_size: value.tracking_size,
35 }
36 }
37}
38
39pub async fn system(
40 config: Config,
41 sink_auth: SinkAuth,
42 password: &str,
43 graceful_token: CancellationToken,
44 crash_token: CancellationToken,
45) -> Result<(SystemRef, JoinHandle<()>), SystemError> {
46 let (system, mut runner) =
48 ActorSystem::create(graceful_token.clone(), crash_token.clone());
49
50 system
51 .add_helper("config", ConfigHelper::from(config.clone()))
52 .await;
53
54 let wasm_runtime = WasmRuntime::new(config.spec.clone())
56 .map_err(|e| SystemError::EngineCreation(e.to_string()))?;
57 system
58 .add_helper("wasm_runtime", Arc::new(wasm_runtime))
59 .await;
60
61 let contracts: HashMap<String, Vec<u8>> = HashMap::new();
62 system
63 .add_helper("contracts", Arc::new(RwLock::new(contracts)))
64 .await;
65
66 let actor_spec = config.spec.clone().map(MachineSpec::from);
67
68 let mut db = Database::open(&config.internal_db, actor_spec)
70 .map_err(|e| SystemError::DatabaseOpen(e.to_string()))?;
71 system.add_helper("store", db.clone()).await;
72
73 let api_key = if sink_auth.api_key.is_empty() {
75 None
76 } else {
77 Some(sink_auth.api_key.clone())
78 };
79 let ave_sink = AveSink::new(
80 sink_auth.sink.sinks,
81 sink_auth.token,
82 &sink_auth.sink.auth,
83 &sink_auth.sink.username,
84 &sink_auth.password,
85 api_key,
86 );
87 system.add_helper("sink", ave_sink).await;
88
89 let pass_hash =
90 hash_borsh(&*config.hash_algorithm.hasher(), &password.to_string())
91 .map_err(|e| SystemError::PasswordHash(e.to_string()))?;
92
93 let array_hash: [u8; 32] = pass_hash
94 .hash_array()
95 .map_err(|e| SystemError::HashArrayConversion(e.to_string()))?;
96
97 let encrypted_key = EncryptedKey::new(&array_hash)
99 .map_err(|e| SystemError::EncryptedKeyCreation(e.to_string()))?;
100
101 system.add_helper("encrypted_key", encrypted_key).await;
102
103 let db_manager_actor = system
104 .create_root_actor("db_manager", DBManager)
105 .await
106 .map_err(|e| SystemError::RootActorCreation(e.to_string()))?;
107
108 let ext_db = Arc::new(
109 ExternalDB::build(
110 config.external_db.db,
111 config.external_db.durability,
112 db_manager_actor,
113 config.spec.clone(),
114 )
115 .await
116 .map_err(|e| SystemError::ExternalDbBuild(e.to_string()))?,
117 );
118
119 system.add_helper("ext_db", Arc::clone(&ext_db)).await;
120
121 let runner = tokio::spawn(async move {
122 runner.run().await;
123 if let Err(e) = ext_db.shutdown().await {
124 error!(error = %e, "Failed to stop external db");
125 };
126 if let Err(e) = db.stop() {
127 error!(error = %e, "Failed to stop db");
128 };
129 });
130
131 Ok((system, runner))
132}
133
134#[cfg(test)]
135pub mod tests {
136
137 use ave_common::identity::{HashAlgorithm, KeyPairAlgorithm};
138 use network::Config as NetworkConfig;
139 use tempfile::TempDir;
140 use test_log::test;
141
142 use crate::config::{
143 AveExternalDBConfig, AveExternalDBFeatureConfig, AveInternalDBConfig,
144 AveInternalDBFeatureConfig,
145 };
146
147 use super::*;
148
149 #[derive(Debug, Clone)]
150 pub struct Dummy;
151
152 #[test(tokio::test)]
153 async fn test_system() {
154 let (system, _runner, _dirs) = create_system().await;
155 let db: Option<Database> = system.get_helper("store").await;
156 assert!(db.is_some());
157 let ep: Option<EncryptedKey> = system.get_helper("encrypted_key").await;
158 assert!(ep.is_some());
159 let any: Option<Dummy> = system.get_helper("dummy").await;
160 assert!(any.is_none());
161 }
162
163 pub async fn create_system() -> (SystemRef, JoinHandle<()>, Vec<TempDir>) {
164 let mut vec_dirs = vec![];
165
166 let dir_ave_db =
167 tempfile::tempdir().expect("Can not create temporal directory");
168 let ave_path = dir_ave_db.path().to_path_buf();
169 vec_dirs.push(dir_ave_db);
170
171 let dir_ext_db =
172 tempfile::tempdir().expect("Can not create temporal directory");
173 let ext_path = dir_ext_db.path().to_path_buf();
174 vec_dirs.push(dir_ext_db);
175
176 let dir_contracts =
177 tempfile::tempdir().expect("Can not create temporal directory");
178 let contracts_path = dir_contracts.path().to_path_buf();
179 vec_dirs.push(dir_contracts);
180
181 let newtork_config = NetworkConfig::new(
182 network::NodeType::Bootstrap,
183 vec![],
184 vec![],
185 vec![],
186 );
187 let config = Config {
188 keypair_algorithm: KeyPairAlgorithm::Ed25519,
189 hash_algorithm: HashAlgorithm::Blake3,
190 internal_db: AveInternalDBConfig {
191 db: AveInternalDBFeatureConfig::build(&ave_path),
192 ..Default::default()
193 },
194 external_db: AveExternalDBConfig {
195 db: AveExternalDBFeatureConfig::build(&ext_path),
196 ..Default::default()
197 },
198 network: newtork_config,
199 contracts_path: contracts_path,
200 always_accept: false,
201 tracking_size: 100,
202 is_service: true,
203 spec: None,
204 };
205
206 let (sys, handlers) = system(
207 config.clone(),
208 SinkAuth::default(),
209 "password",
210 CancellationToken::new(),
211 CancellationToken::new(),
212 )
213 .await
214 .unwrap();
215
216 (sys, handlers, vec_dirs)
217 }
218}