#![allow(dead_code)]
use blueprint_contexts::tangle::TangleClientContext;
use blueprint_core::Job;
use blueprint_core_testing_utils::runner::{TestEnv, TestRunner};
use blueprint_crypto_tangle_pair_signer::TanglePairSigner;
use blueprint_keystore::backends::Backend;
use blueprint_keystore::crypto::sp_core::SpSr25519;
use blueprint_qos::heartbeat::HeartbeatConsumer;
use blueprint_qos::heartbeat::HeartbeatStatus;
use blueprint_qos::{QoSConfig, QoSService};
use blueprint_runner::BackgroundService;
use blueprint_runner::config::BlueprintEnvironment;
use blueprint_runner::config::Multiaddr;
use blueprint_runner::error::{JobCallError, RunnerError as Error};
use blueprint_runner::tangle::config::TangleConfig;
use blueprint_std::fmt::{Debug, Formatter};
use blueprint_std::pin::Pin;
use blueprint_std::sync::Arc;
use blueprint_tangle_extra::consumer::TangleConsumer;
use blueprint_tangle_extra::producer::TangleProducer;
use tokio::sync::Mutex;
use tokio::task::JoinHandle;
pub struct TangleTestEnv<Ctx, C>
where
C: HeartbeatConsumer + Send + Sync + 'static,
{
pub runner: Option<TestRunner<Ctx>>,
pub config: TangleConfig,
pub env: BlueprintEnvironment,
pub runner_handle: Mutex<Option<JoinHandle<Result<(), Error>>>>,
pub qos_config: Option<QoSConfig>,
pub qos_service: Option<Arc<QoSService<C>>>,
}
impl<Ctx, C> TangleTestEnv<Ctx, C>
where
C: HeartbeatConsumer + Send + Sync + 'static,
{
pub(crate) fn update_networking_config(
&mut self,
bootnodes: Vec<Multiaddr>,
network_bind_port: u16,
) {
self.env.bootnodes = bootnodes;
self.env.network_bind_port = network_bind_port;
}
pub fn set_qos_config(&mut self, config: QoSConfig) {
self.qos_config = Some(config);
}
pub fn set_qos_service(&mut self, service: Arc<QoSService<C>>) {
self.qos_service = Some(service);
}
pub(crate) async fn set_tangle_producer_consumer(&mut self) {
let runner = self.runner.as_mut().expect("Runner already running");
let builder = runner.builder.take().expect("Runner already running");
let tangle_client = self
.env
.tangle_client()
.await
.expect("Tangle node should be running");
let producer = TangleProducer::finalized_blocks(tangle_client.rpc_client.clone())
.await
.expect("Failed to create producer");
let sr25519_signer = self
.env
.keystore()
.first_local::<SpSr25519>()
.expect("key not found");
let sr25519_pair = self
.env
.keystore()
.get_secret::<SpSr25519>(&sr25519_signer)
.expect("key not found");
let sr25519_signer = TanglePairSigner::new(sr25519_pair.0);
let consumer = TangleConsumer::new(tangle_client.rpc_client.clone(), sr25519_signer);
runner.builder = Some(builder.producer(producer).consumer(consumer));
}
}
impl<Ctx, C> Debug for TangleTestEnv<Ctx, C>
where
C: HeartbeatConsumer + Send + Sync + 'static,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TangleTestEnv")
.field("config", &self.config)
.field("env", &self.env)
.finish_non_exhaustive()
}
}
impl<Ctx, C> TestEnv for TangleTestEnv<Ctx, C>
where
Ctx: Clone + Send + Sync + 'static,
C: HeartbeatConsumer + Send + Sync + 'static,
{
type Config = TangleConfig;
type Context = Ctx;
fn new(config: Self::Config, env: BlueprintEnvironment) -> Result<Self, Error> {
let runner = TestRunner::<Ctx>::new::<Self::Config>(config.clone(), env.clone());
Ok(Self {
runner: Some(runner),
config,
env,
runner_handle: Mutex::new(None),
qos_config: None,
qos_service: None,
})
}
fn add_job<J, T>(&mut self, job: J)
where
J: Job<T, Self::Context> + Send + Sync + 'static,
T: 'static,
{
self.runner
.as_mut()
.expect("Runner already running")
.add_job(job);
}
fn add_background_service<B>(&mut self, service: B)
where
B: BackgroundService + Send + 'static,
{
self.runner
.as_mut()
.expect("Runner already running")
.add_background_service(service);
}
fn get_blueprint_config(&self) -> BlueprintEnvironment {
self.env.clone()
}
async fn run_runner(&mut self, context: Self::Context) -> Result<(), Error> {
let mut runner = self.runner.take().expect("Runner already running");
if let Some(qos_service_arc) = &self.qos_service {
runner.qos_service(qos_service_arc.clone());
}
let handle = tokio::spawn(async move { runner.run(context).await });
let mut guard = self.runner_handle.lock().await;
*guard = Some(handle);
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
let handle = guard.take().expect("was just set");
if !handle.is_finished() {
*guard = Some(handle);
blueprint_core::info!("Runner started successfully");
return Ok(());
}
blueprint_core::info!("Runner task finished");
match handle.await {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => {
blueprint_core::error!("Runner failed during startup: {}", e);
Err(e)
}
Err(e) => {
blueprint_core::error!("Runner task panicked: {}", e);
Err(JobCallError::JobDidntFinish(e).into())
}
}
}
}
impl<Ctx, C> Drop for TangleTestEnv<Ctx, C>
where
C: HeartbeatConsumer + Send + Sync + 'static,
{
fn drop(&mut self) {
futures::executor::block_on(async {
let mut guard = self.runner_handle.lock().await;
if let Some(handle) = guard.take() {
handle.abort();
}
});
}
}
#[derive(Clone, Default)]
pub struct MockHeartbeatConsumer {
pub heartbeats: Arc<Mutex<Vec<HeartbeatStatus>>>,
}
impl MockHeartbeatConsumer {
#[must_use]
pub fn new() -> Self {
Self {
heartbeats: Arc::new(Mutex::new(Vec::new())),
}
}
#[must_use]
pub async fn heartbeat_count(&self) -> usize {
self.heartbeats.lock().await.len()
}
#[must_use]
pub async fn get_heartbeats(&self) -> Vec<HeartbeatStatus> {
self.heartbeats.lock().await.clone()
}
}
impl HeartbeatConsumer for MockHeartbeatConsumer {
fn send_heartbeat(
&self,
status: &HeartbeatStatus,
) -> Pin<Box<dyn std::future::Future<Output = Result<(), blueprint_qos::error::Error>> + Send>>
{
let status = status.clone();
let heartbeats_store = self.heartbeats.clone();
Box::pin(async move {
heartbeats_store.lock().await.push(status.clone());
Ok(())
})
}
}