1use crate::runner::MockHeartbeatConsumer;
2use crate::Error;
3use crate::multi_node::{find_open_tcp_bind_port, MultiNodeTestEnv};
4use crate::{InputValue, OutputValue, keys::inject_tangle_key};
5use blueprint_auth::proxy::DEFAULT_AUTH_PROXY_PORT;
6use blueprint_chain_setup::tangle::testnet::SubstrateNode;
7use blueprint_chain_setup::tangle::transactions;
8use blueprint_chain_setup::tangle::transactions::setup_operators_with_service;
9use blueprint_client_tangle::client::TangleClient;
10use blueprint_contexts::tangle::TangleClientContext;
11use blueprint_core::debug;
12use blueprint_core_testing_utils::setup_log;
13use blueprint_crypto_tangle_pair_signer::TanglePairSigner;
14use blueprint_keystore::backends::Backend;
15use blueprint_keystore::crypto::sp_core::{SpEcdsa, SpSr25519};
16use blueprint_runner::config::BlueprintEnvironment;
17use blueprint_runner::config::ContextConfig;
18use blueprint_runner::config::SupportedChains;
19use blueprint_runner::error::RunnerError;
20use blueprint_runner::tangle::error::TangleError;
21use blueprint_std::io;
22use blueprint_std::path::{Path, PathBuf};
23use tangle_subxt::subxt::utils::AccountId32;
24use tangle_subxt::tangle_testnet_runtime::api::assets::events::created::AssetId;
25use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::pricing::PricingQuote;
26use tangle_subxt::tangle_testnet_runtime::api::runtime_types::tangle_primitives::services::types::{AssetSecurityCommitment, AssetSecurityRequirement};
27use blueprint_tangle_extra::serde::new_bounded_string;
28use std::marker::PhantomData;
29use std::net::Ipv4Addr;
30use std::sync::Arc;
31use tangle_subxt::tangle_testnet_runtime::api::services::calls::types::register::RegistrationArgs;
32use tangle_subxt::tangle_testnet_runtime::api::services::calls::types::request::RequestArgs;
33use tangle_subxt::tangle_testnet_runtime::api::services::events::JobCalled;
34use tangle_subxt::tangle_testnet_runtime::api::services::{
35 calls::types::{call::Job, register::Preferences},
36 events::JobResultSubmitted,
37};
38use tempfile::TempDir;
39use tokio::sync::Mutex;
40use tokio::task::JoinHandle;
41use blueprint_core::{error, info};
42use url::Url;
43use blueprint_auth::db::RocksDb;
44use blueprint_manager_bridge::server::{Bridge, BridgeHandle};
45use blueprint_pricing_engine_lib::{init_benchmark_cache, init_operator_signer, load_pricing_from_toml, OperatorConfig, DEFAULT_CONFIG};
46use blueprint_pricing_engine_lib::service::rpc::server::run_rpc_server;
47
48pub const ENDOWED_TEST_NAMES: [&str; 10] = [
49 "Alice",
50 "Bob",
51 "Charlie",
52 "Dave",
53 "Eve",
54 "Ferdinand",
55 "Gina",
56 "Hank",
57 "Ivy",
58 "Jack",
59];
60
61#[derive(Clone, Debug)]
63pub struct TangleTestConfig {
64 pub http_endpoint: Url,
65 pub ws_endpoint: Url,
66 pub temp_dir: PathBuf,
67 pub bridge_socket_path: PathBuf,
68}
69
70pub struct TangleTestHarness<Ctx = ()> {
72 client: TangleClient,
73 pub sr25519_signer: TanglePairSigner<sp_core::sr25519::Pair>,
74 pub ecdsa_signer: TanglePairSigner<sp_core::ecdsa::Pair>,
75 pub alloy_key: alloy_signer_local::PrivateKeySigner,
76 config: TangleTestConfig,
77 _phantom: PhantomData<Ctx>,
78
79 _temp_dir: tempfile::TempDir,
81 _node: SubstrateNode,
82 _auth_proxy: JoinHandle<Result<(), Error>>,
83 _bridge: BridgeHandle,
84}
85
86pub async fn generate_env_from_node_id(
96 identity: &str,
97 http_endpoint: Url,
98 ws_endpoint: Url,
99 test_dir: &Path,
100) -> Result<BlueprintEnvironment, RunnerError> {
101 let node_dir = test_dir.join(identity.to_ascii_lowercase());
102 let keystore_path = node_dir.join("keystore");
103 tokio::fs::create_dir_all(&keystore_path).await?;
104 inject_tangle_key(&keystore_path, &format!("//{identity}"))
105 .map_err(|err| RunnerError::Tangle(TangleError::Keystore(err)))?;
106
107 let data_dir = node_dir.join("data");
108 tokio::fs::create_dir_all(&data_dir).await?;
109
110 let context_config = ContextConfig::create_tangle_config(
112 http_endpoint,
113 ws_endpoint,
114 keystore_path.display().to_string(),
115 None,
116 data_dir,
117 None,
118 SupportedChains::LocalTestnet,
119 0,
120 Some(0),
121 );
122
123 let mut env =
125 BlueprintEnvironment::load_with_config(context_config).map_err(RunnerError::Config)?;
126
127 env.test_mode = true;
129
130 Ok(env)
131}
132
133impl<Ctx> TangleTestHarness<Ctx>
134where
135 Ctx: Clone + Send + Sync + 'static,
136{
137 pub async fn setup(test_dir: TempDir) -> Result<Self, Error> {
157 setup_log();
158 let node = blueprint_chain_setup::tangle::run(
160 blueprint_chain_setup::tangle::NodeConfig::new(false).with_log_target("evm", "trace"),
161 )
162 .await
163 .map_err(|e| Error::Setup(e.to_string()))?;
164 let http_endpoint = Url::parse(&format!("http://127.0.0.1:{}", node.ws_port()))?;
165 let ws_endpoint = Url::parse(&format!("ws://127.0.0.1:{}", node.ws_port()))?;
166
167 let (auth_proxy_db, auth_proxy_task) =
168 run_auth_proxy(test_dir.path().to_path_buf(), DEFAULT_AUTH_PROXY_PORT).await?;
169
170 let auth_proxy = tokio::spawn(auth_proxy_task);
171
172 let mut alice_env = generate_env_from_node_id(
174 ENDOWED_TEST_NAMES[0],
175 http_endpoint.clone(),
176 ws_endpoint.clone(),
177 test_dir.path(),
178 )
179 .await?;
180
181 let runtime_dir = test_dir.path().join("runtime");
183 tokio::fs::create_dir_all(&runtime_dir).await?;
184
185 let bridge = Bridge::new(runtime_dir, String::from("service"), auth_proxy_db, true);
186 let bridge_socket_path = bridge.base_socket_path();
187
188 let (bridge_handle, _alive_rx) = bridge.spawn()?;
189 alice_env.bridge_socket_path = Some(bridge_socket_path.clone());
190
191 let config = TangleTestConfig {
193 http_endpoint: http_endpoint.clone(),
194 ws_endpoint: ws_endpoint.clone(),
195 temp_dir: test_dir.path().to_path_buf(),
196 bridge_socket_path,
197 };
198
199 let keystore = alice_env.keystore();
201 let sr25519_public = keystore.first_local::<SpSr25519>()?;
202 let sr25519_pair = keystore.get_secret::<SpSr25519>(&sr25519_public)?;
203 let sr25519_signer = TanglePairSigner::new(sr25519_pair.0);
204
205 let ecdsa_public = keystore.first_local::<SpEcdsa>()?;
206 let ecdsa_pair = keystore.get_secret::<SpEcdsa>(&ecdsa_public)?;
207 let ecdsa_signer = TanglePairSigner::new(ecdsa_pair.0);
208 let alloy_key = ecdsa_signer
209 .alloy_key()
210 .map_err(|e| Error::Setup(e.to_string()))?;
211
212 let client = alice_env.tangle_client().await?;
213
214 let harness = TangleTestHarness {
215 client,
216 sr25519_signer,
217 ecdsa_signer,
218 alloy_key,
219 config,
220 _phantom: PhantomData,
221 _temp_dir: test_dir,
222 _node: node,
223 _auth_proxy: auth_proxy,
224 _bridge: bridge_handle,
225 };
226
227 harness
229 .deploy_mbsm_if_needed()
230 .await
231 .map_err(|e| Error::Setup(format!("Failed to deploy MBSM: {e}")))?;
232
233 Ok(harness)
234 }
235
236 #[must_use]
237 pub fn env(&self) -> &BlueprintEnvironment {
238 &self.client.config
239 }
240
241 #[must_use]
242 pub fn config(&self) -> &TangleTestConfig {
243 &self.config
244 }
245}
246
247struct NodeInfo {
248 env: BlueprintEnvironment,
249 client: TangleClient,
250 preferences: Preferences,
251 _pricing_rpc: JoinHandle<()>,
253}
254
255#[derive(Debug, Clone)]
256pub struct SetupServicesOpts<const N: usize> {
257 pub exit_after_registration: bool,
259 pub skip_service_request: bool,
261 pub registration_args: [RegistrationArgs; N],
263 pub request_args: RequestArgs,
265}
266
267impl<const N: usize> Default for SetupServicesOpts<N> {
268 fn default() -> Self {
269 Self {
270 exit_after_registration: false,
271 skip_service_request: false,
272 registration_args: vec![RegistrationArgs::default(); N].try_into().unwrap(),
273 request_args: RequestArgs::default(),
274 }
275 }
276}
277
278impl<Ctx> TangleTestHarness<Ctx>
279where
280 Ctx: Clone + Send + Sync + 'static,
281{
282 async fn get_all_node_info<const N: usize>(&self) -> Result<Vec<NodeInfo>, RunnerError> {
283 let mut nodes = vec![];
284
285 for name in &ENDOWED_TEST_NAMES[..N] {
286 let mut env = generate_env_from_node_id(
287 name,
288 self.config.http_endpoint.clone(),
289 self.config.ws_endpoint.clone(),
290 &self.config.temp_dir,
291 )
292 .await?;
293 env.bridge_socket_path = Some(self.config.bridge_socket_path.clone());
294
295 let client = env
296 .tangle_client()
297 .await
298 .map_err(|err| RunnerError::Other(err.into()))?;
299
300 let keystore = env.keystore();
301 let ecdsa_public = keystore
302 .first_local::<SpEcdsa>()
303 .map_err(|err| RunnerError::Tangle(TangleError::Keystore(err)))?;
304
305 let rpc_port = find_open_tcp_bind_port();
306 let rpc_address = format!("127.0.0.1:{rpc_port}");
307 info!("Binding node {name} to {rpc_address}");
308
309 let operator_config = OperatorConfig {
310 keystore_path: self.config.temp_dir.join(name),
311 rpc_port,
312 ..Default::default()
313 };
314
315 let benchmark_cache = init_benchmark_cache(&operator_config)
316 .await
317 .map_err(|e| RunnerError::Other(e.into()))?;
318
319 let pricing_config =
320 load_pricing_from_toml(DEFAULT_CONFIG).map_err(|e| RunnerError::Other(e.into()))?;
321
322 let operator_signer =
323 init_operator_signer(&operator_config, &operator_config.keystore_path)
324 .map_err(|e| RunnerError::Other(e.into()))?;
325
326 let pricing_rpc = tokio::spawn(async move {
327 if let Err(e) = run_rpc_server(
328 Arc::new(operator_config),
329 benchmark_cache,
330 Arc::new(Mutex::new(pricing_config)),
331 operator_signer,
332 )
333 .await
334 {
335 blueprint_core::error!("gRPC server error: {}", e);
336 }
337 });
338
339 let preferences = Preferences {
340 key: blueprint_runner::tangle::config::decompress_pubkey(&ecdsa_public.0.0)
341 .unwrap(),
342 rpc_address: new_bounded_string(rpc_address),
343 };
344
345 nodes.push(NodeInfo {
346 env,
347 client,
348 preferences,
349 _pricing_rpc: pricing_rpc,
350 });
351 }
352
353 Ok(nodes)
354 }
355
356 #[must_use]
358 pub fn client(&self) -> &TangleClient {
359 &self.client
360 }
361
362 async fn deploy_mbsm_if_needed(&self) -> Result<(), Error> {
364 let latest_revision = transactions::get_latest_mbsm_revision(&self.client)
365 .await
366 .map_err(|e| Error::Setup(e.to_string()))?;
367
368 if let Some((rev, addr)) = latest_revision {
369 debug!("MBSM is deployed at revision #{rev} at address {addr}");
370 return Ok(());
371 }
372
373 debug!("MBSM is not deployed");
374
375 let bytecode = tnt_core_bytecode::bytecode::MASTER_BLUEPRINT_SERVICE_MANAGER;
376 transactions::deploy_new_mbsm_revision(
377 self.config.ws_endpoint.as_str(),
378 &self.client,
379 &self.sr25519_signer,
380 self.alloy_key.clone(),
381 bytecode,
382 alloy_primitives::address!("0xdeadbeefdeadbeefdeadbeefdeadbeefdeadbeef"), )
384 .await
385 .map_err(|e| Error::Setup(e.to_string()))?;
386
387 Ok(())
388 }
389
390 pub fn create_deploy_opts(
398 &self,
399 manifest_path: PathBuf,
400 ) -> io::Result<blueprint_chain_setup::tangle::deploy::Opts> {
401 Ok(blueprint_chain_setup::tangle::deploy::Opts {
402 pkg_name: Some(self.get_blueprint_name(&manifest_path)?),
403 http_rpc_url: self.config.http_endpoint.to_string(),
404 ws_rpc_url: self.config.ws_endpoint.to_string(),
405 manifest_path,
406 signer: Some(self.sr25519_signer.clone()),
407 signer_evm: Some(self.alloy_key.clone()),
408 })
409 }
410
411 #[allow(clippy::unused_self)]
412 fn get_blueprint_name(&self, manifest_path: &std::path::Path) -> io::Result<String> {
413 let manifest = blueprint_core_testing_utils::read_cargo_toml_file(manifest_path)?;
414 Ok(manifest.package.unwrap().name)
415 }
416
417 pub async fn deploy_blueprint(&self) -> Result<u64, Error> {
425 let manifest_path = std::env::current_dir()?.join("Cargo.toml");
426 let opts = self.create_deploy_opts(manifest_path)?;
427 let blueprint_id = blueprint_chain_setup::tangle::deploy::deploy_to_tangle(opts)
428 .await
429 .map_err(|e| Error::Setup(e.to_string()))?;
430 Ok(blueprint_id)
431 }
432
433 pub async fn setup_services_with_options<const N: usize>(
445 &self,
446 SetupServicesOpts {
447 exit_after_registration,
448 skip_service_request,
449 registration_args,
450 request_args,
451 }: SetupServicesOpts<N>,
452 ) -> Result<(MultiNodeTestEnv<Ctx, MockHeartbeatConsumer>, u64, u64), Error> {
453 const { assert!(N > 0, "Must have at least 1 initial node") };
454
455 let blueprint_id = self.deploy_blueprint().await?;
457
458 let nodes = self.get_all_node_info::<N>().await?;
459
460 let service_id = if exit_after_registration {
462 0
463 } else {
464 let mut all_clients = Vec::new();
465 let mut all_signers = Vec::new();
466 let mut all_preferences = Vec::new();
467
468 for node in nodes {
469 let keystore = node.env.keystore();
470 let sr25519_public = keystore
471 .first_local::<SpSr25519>()
472 .map_err(|err| RunnerError::Tangle(TangleError::Keystore(err)))?;
473 let sr25519_pair = keystore
474 .get_secret::<SpSr25519>(&sr25519_public)
475 .map_err(|err| RunnerError::Tangle(TangleError::Keystore(err)))?;
476 let sr25519_signer = TanglePairSigner::new(sr25519_pair.0);
477 all_clients.push(node.client);
478 all_signers.push(sr25519_signer);
479 all_preferences.push(node.preferences);
480 }
481
482 setup_operators_with_service(
483 &all_clients[..N],
484 &all_signers[..N],
485 blueprint_id,
486 &all_preferences,
487 ®istration_args,
488 request_args.clone(),
489 exit_after_registration,
490 skip_service_request,
491 )
492 .await
493 .map_err(|e| Error::Setup(e.to_string()))?
494 };
495
496 let executor = MultiNodeTestEnv::new::<N>(self.config.clone());
498
499 Ok((executor, service_id, blueprint_id))
500 }
501
502 pub async fn setup_services<const N: usize>(
518 &self,
519 exit_after_registration: bool,
520 ) -> Result<(MultiNodeTestEnv<Ctx, MockHeartbeatConsumer>, u64, u64), Error> {
521 const { assert!(N > 0, "Must have at least 1 initial node") };
522
523 self.setup_services_with_options::<N>(SetupServicesOpts {
524 exit_after_registration,
525 skip_service_request: false,
526 registration_args: vec![RegistrationArgs::default(); N].try_into().unwrap(),
527 request_args: RequestArgs::default(),
528 })
529 .await
530 }
531
532 #[allow(clippy::too_many_arguments)]
546 pub async fn request_service_with_quotes(
547 &self,
548 blueprint_id: u64,
549 request_args: RequestArgs,
550 operators: Vec<AccountId32>,
551 quotes: Vec<PricingQuote>,
552 quote_signatures: Vec<sp_core::ecdsa::Signature>,
553 security_commitments: Vec<AssetSecurityCommitment<AssetId>>,
554 optional_assets: Option<Vec<AssetSecurityRequirement<AssetId>>>,
555 ) -> Result<u64, Error> {
556 transactions::request_service_with_quotes(
557 &self.client,
558 &self.sr25519_signer,
559 blueprint_id,
560 request_args,
561 operators,
562 quotes,
563 quote_signatures,
564 security_commitments,
565 optional_assets,
566 )
567 .await
568 .map_err(|e| Error::Setup(e.to_string()))
569 }
570
571 pub async fn submit_job(
585 &self,
586 service_id: u64,
587 job_id: u8,
588 inputs: Vec<InputValue>,
589 ) -> Result<JobCalled, Error> {
590 let job = transactions::submit_job(
591 &self.client,
592 &self.sr25519_signer,
593 service_id,
594 Job::from(job_id),
595 inputs,
596 0, )
598 .await
599 .map_err(|e| Error::Setup(e.to_string()))?;
600
601 Ok(job)
602 }
603
604 pub async fn wait_for_job_execution(
617 &self,
618 service_id: u64,
619 job: JobCalled,
620 ) -> Result<JobResultSubmitted, Error> {
621 let results = transactions::wait_for_completion_of_tangle_job(
622 &self.client,
623 service_id,
624 job.call_id,
625 1,
626 )
627 .await
628 .map_err(|e| Error::Setup(e.to_string()))?;
629
630 Ok(results)
631 }
632
633 pub fn verify_job(&self, results: &JobResultSubmitted, expected: impl AsRef<[OutputValue]>) {
646 assert_eq!(
647 results.result.len(),
648 expected.as_ref().len(),
649 "Number of outputs doesn't match expected"
650 );
651
652 for (result, expected) in results.result.iter().zip(expected.as_ref().iter()) {
653 assert_eq!(result, expected);
654 }
655 }
656}
657
658async fn run_auth_proxy(
675 data_dir: PathBuf,
676 auth_proxy_port: u16,
677) -> Result<(RocksDb, impl Future<Output = Result<(), Error>>), Error> {
678 let db_path = data_dir.join("private").join("auth-proxy").join("db");
679 tokio::fs::create_dir_all(&db_path).await?;
680
681 let proxy = blueprint_auth::proxy::AuthenticatedProxy::new(&db_path)?;
682 let db = proxy.db();
683
684 let task = async move {
685 let router = proxy.router();
686 let listener =
687 tokio::net::TcpListener::bind((Ipv4Addr::LOCALHOST, auth_proxy_port)).await?;
688 info!(
689 "Auth proxy listening on {}:{}",
690 Ipv4Addr::LOCALHOST,
691 auth_proxy_port
692 );
693 let result = axum::serve(listener, router).await;
694 if let Err(err) = result {
695 error!("Auth proxy error: {err}");
696 }
697
698 Ok(())
699 };
700
701 Ok((db, task))
702}
703
704#[cfg(test)]
705mod tests {
706 use super::*;
707
708 #[tokio::test]
709 async fn test_harness_setup() {
710 let test_dir = TempDir::new().unwrap();
711 let harness = Box::pin(TangleTestHarness::<()>::setup(test_dir)).await;
712 assert!(harness.is_ok(), "Harness setup should succeed");
713
714 let harness = harness.unwrap();
715 assert!(
716 harness.client().now().await.is_some(),
717 "Client should be connected to live chain"
718 );
719 }
720
721 #[tokio::test]
722 async fn test_deploy_mbsm() {
723 let test_dir = TempDir::new().unwrap();
724 let harness = Box::pin(TangleTestHarness::<()>::setup(test_dir))
725 .await
726 .unwrap();
727
728 let latest_revision = transactions::get_latest_mbsm_revision(harness.client())
730 .await
731 .unwrap();
732 assert!(latest_revision.is_some(), "MBSM should be deployed");
733 }
734}