gadget_common/
lib.rs

1use crate::config::ProtocolConfig;
2use crate::environments::EventMetadata;
3use crate::module::network::Network;
4use crate::module::{GadgetProtocol, GeneralModule};
5use crate::prelude::PrometheusConfig;
6use gadget_core::gadget::general::GeneralGadget;
7use gadget_core::gadget::manager::{AbstractGadget, GadgetError, GadgetManager};
8pub use gadget_core::job::JobError;
9pub use gadget_core::job::*;
10pub use gadget_core::job_manager::WorkManagerInterface;
11pub use gadget_core::job_manager::{PollMethod, ProtocolWorkManager, WorkManagerError};
12use gadget_io::tokio::task::JoinError;
13use parking_lot::RwLock;
14use sp_core::ecdsa;
15use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18pub use subxt_signer;
19pub mod environments;
20use crate::environments::GadgetEnvironment;
21use gadget_core::gadget::general::Client;
22pub mod module;
23
24#[allow(ambiguous_glob_reexports)]
25pub mod prelude {
26    pub use crate::client::*;
27    pub use crate::config::*;
28    pub use crate::environments::*;
29    pub use crate::full_protocol::{FullProtocolConfig, NodeInput};
30    pub use crate::generate_setup_and_run_command;
31    pub use crate::keystore::{ECDSAKeyStore, InMemoryBackend, KeystoreBackend};
32    pub use crate::module::WorkManagerConfig;
33    pub use crate::{BuiltExecutableJobWrapper, JobBuilder, JobError, WorkManagerInterface};
34    pub use async_trait::async_trait;
35    pub use gadget_core::job_manager::ProtocolWorkManager;
36    pub use gadget_core::job_manager::SendFuture;
37    pub use gadget_core::job_manager::WorkManagerError;
38    pub use gadget_io::tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39    pub use parking_lot::Mutex;
40    pub use sp_runtime::traits::Block;
41    pub use std::pin::Pin;
42    pub use std::sync::Arc;
43}
44
45// Convenience re-exports
46pub use async_trait;
47pub use color_eyre;
48pub use gadget_io;
49pub use tangle_subxt;
50
51pub mod tangle_runtime {
52    pub use tangle_subxt::subxt::utils::AccountId32;
53    pub use tangle_subxt::tangle_testnet_runtime::api;
54    pub use tangle_subxt::tangle_testnet_runtime::api::runtime_types::{
55        bounded_collections::bounded_vec::BoundedVec,
56    };
57}
58
59pub mod channels;
60pub mod client;
61pub mod config;
62pub mod debug_logger;
63pub mod full_protocol;
64pub mod helpers;
65pub mod keystore;
66pub mod locks;
67pub mod prometheus;
68pub mod protocol;
69pub mod tracer;
70pub mod utils;
71
72#[derive(Debug)]
73pub enum Error {
74    RegistryCreateError { err: String },
75    RegistrySendError { err: String },
76    RegistryRecvError { err: String },
77    RegistrySerializationError { err: String },
78    RegistryListenError { err: String },
79    GadgetManagerError { err: GadgetError },
80    InitError { err: String },
81    WorkManagerError { err: WorkManagerError },
82    ProtocolRemoteError { err: String },
83    ClientError { err: String },
84    JobError { err: JobError },
85    NetworkError { err: String },
86    KeystoreError { err: String },
87    MissingNetworkId,
88    PeerNotFound { id: ecdsa::Public },
89    JoinError { err: JoinError },
90    ParticipantNotSelected { id: ecdsa::Public, reason: String },
91    PrometheusError { err: String },
92    Other { err: String },
93}
94
95impl From<String> for crate::Error {
96    fn from(err: String) -> Self {
97        Self::Other { err }
98    }
99}
100
101impl Display for Error {
102    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103        Debug::fmt(self, f)
104    }
105}
106
107impl std::error::Error for Error {}
108
109impl From<JobError> for Error {
110    fn from(err: JobError) -> Self {
111        Error::JobError { err }
112    }
113}
114
115pub async fn run_protocol<Env: GadgetEnvironment, T: ProtocolConfig<Env>>(
116    mut protocol_config: T,
117) -> Result<(), Error> {
118    let client = protocol_config.take_client();
119    let network = protocol_config.take_network();
120    let protocol = protocol_config.take_protocol();
121
122    let prometheus_config = protocol_config.prometheus_config();
123
124    // Before running, wait for the first finality notification we receive
125    let latest_finality_notification = get_latest_event_from_client::<Env>(&client).await?;
126    let work_manager = create_work_manager(&latest_finality_notification, &protocol).await?;
127    let proto_module = GeneralModule::new(network.clone(), protocol, work_manager);
128    // Plug the module into the general gadget to interface the WebbGadget with Substrate
129    let substrate_gadget = GeneralGadget::new(client, proto_module);
130    let network_future = network.run();
131    let gadget_future = async move {
132        // Poll the first finality notification to ensure clients can execute without having to wait
133        // for another block to be produced
134        if let Err(err) = substrate_gadget
135            .on_event_received(latest_finality_notification)
136            .await
137        {
138            substrate_gadget.process_error(err).await;
139        }
140
141        GadgetManager::new(substrate_gadget)
142            .await
143            .map_err(|err| Error::GadgetManagerError { err })
144    };
145
146    if let Err(err) = prometheus::setup(prometheus_config.clone()).await {
147        protocol_config
148            .logger()
149            .warn(format!("Error setting up prometheus: {err:?}"));
150    } else if let PrometheusConfig::Enabled { bind_addr } = prometheus_config {
151        protocol_config
152            .logger()
153            .info(format!("Prometheus enabled on {bind_addr}"));
154    }
155
156    // Run both the network and the gadget together
157    gadget_io::tokio::try_join!(network_future, gadget_future).map(|_| ())
158}
159
160/// Creates a work manager
161pub async fn create_work_manager<Env: GadgetEnvironment, P: GadgetProtocol<Env>>(
162    latest_event: &<Env as GadgetEnvironment>::Event,
163    protocol: &P,
164) -> Result<ProtocolWorkManager<<Env as GadgetEnvironment>::WorkManager>, Error> {
165    let now = latest_event.number();
166
167    let work_manager_config = protocol.get_work_manager_config();
168
169    let clock = Arc::new(RwLock::new(Some(now)));
170
171    let job_manager = protocol.generate_work_manager(clock.clone()).await;
172
173    let poll_method = match work_manager_config.interval {
174        Some(interval) => PollMethod::Interval {
175            millis: interval.as_millis() as u64,
176        },
177        None => PollMethod::Manual,
178    };
179
180    Ok(ProtocolWorkManager::new(
181        job_manager,
182        work_manager_config.max_active_tasks,
183        work_manager_config.max_pending_tasks,
184        poll_method,
185    ))
186}
187
188async fn get_latest_event_from_client<Env: GadgetEnvironment>(
189    client: &<Env as GadgetEnvironment>::Client,
190) -> Result<<Env as GadgetEnvironment>::Event, Error> {
191    Client::<Env::Event>::latest_event(client)
192        .await
193        .ok_or_else(|| Error::InitError {
194            err: "No event received".to_string(),
195        })
196}
197
198#[macro_export]
199/// Generates a run function that returns a future that runs all the supplied protocols run concurrently
200/// Also generates a setup_node function that sets up the future that runs all the protocols concurrently
201#[allow(clippy::crate_in_macro_def)]
202macro_rules! generate_setup_and_run_command {
203    ($( $config:ident ),*) => {
204        /// Sets up a future that runs all the protocols concurrently
205        pub fn setup_node<Env: GadgetEnvironment, N: Network<Env>, KBE: $crate::keystore::KeystoreBackend, D: Send + Clone + 'static>(node_input: NodeInput<Env, N, KBE, D>) -> impl SendFuture<'static, ()>
206        {
207            async move {
208                if let Err(err) = run(
209                    node_input.clients,
210                    node_input.tx_manager,
211                    node_input.networks,
212                    node_input.logger.clone(),
213                    node_input.account_id,
214                    node_input.keystore,
215                    node_input.prometheus_config,
216                )
217                .await
218                {
219                    node_input
220                        .logger
221                        .error(format!("Error running gadget: {:?}", err));
222                }
223            }
224        }
225
226        pub async fn run<Env: GadgetEnvironment, N: Network<Env>, KBE: $crate::keystore::KeystoreBackend>(
227            client: Vec<Env::Client>,
228            tx_manager: <Env as GadgetEnvironment>::TransactionManager,
229            networks: Vec<N>,
230            logger: DebugLogger,
231            account_id: sp_core::sr25519::Public,
232            key_store: ECDSAKeyStore<KBE>,
233            prometheus_config: $crate::prometheus::PrometheusConfig,
234        ) -> Result<(), Error>
235        {
236            use futures::TryStreamExt;
237            let futures = futures::stream::FuturesUnordered::new();
238            let mut networks: std::collections::VecDeque<_> = networks.into_iter().collect();
239            let mut clients: std::collections::VecDeque<_> = client.into_iter().collect();
240
241            $(
242                let config = crate::$config::new(clients.pop_front().expect("Not enough clients"), tx_manager.clone(), networks.pop_front().expect("Not enough networks"), logger.clone(), account_id.clone(), key_store.clone(), prometheus_config.clone()).await?;
243                futures.push(Box::pin(config.execute()) as std::pin::Pin<Box<dyn SendFuture<'static, Result<(), $crate::Error>>>>);
244            )*
245
246            if let Err(err) = futures.try_collect::<Vec<_>>().await.map(|_| ()) {
247                Err(err)
248            } else {
249                Ok(())
250            }
251        }
252    };
253}
254
255#[macro_export]
256macro_rules! generate_protocol {
257    ($name:expr, $struct_name:ident, $async_proto_params:ty, $proto_gen_path:expr, $create_job_path:expr, $phase_filter:pat, $( $role_filter:pat ),*) => {
258        #[protocol]
259        pub struct $struct_name<
260            Env: GadgetEnvironment,
261            N: Network<Env>,
262            KBE: KeystoreBackend,
263        > {
264            tx_manager: <Env as GadgetEnvironment>::TransactionManager,
265            logger: DebugLogger,
266            client: <Env as GadgetEnvironment>::Client,
267            /// This field should NEVER be used directly. Use Self instead as the network
268            network_inner: N,
269            account_id: sp_core::sr25519::Public,
270            key_store: ECDSAKeyStore<KBE>,
271            jobs_client: Arc<Mutex<Option<JobsClient<Env>>>>,
272            prometheus_config: $crate::prometheus::PrometheusConfig,
273        }
274
275        #[async_trait]
276        impl<
277                Env: GadgetEnvironment,
278                N: Network<Env>,
279                KBE: KeystoreBackend,
280            > FullProtocolConfig<Env> for $struct_name<Env, N, KBE>
281        {
282            type AsyncProtocolParameters = $async_proto_params;
283            type Network = N;
284            type AdditionalNodeParameters = ();
285            type KeystoreBackend = KBE;
286
287            async fn new(
288                client: <Env as GadgetEnvironment>::Client,
289                tx_manager: <Env as GadgetEnvironment>::TransactionManager,
290                network_inner: Self::Network,
291                logger: DebugLogger,
292                account_id: sp_core::sr25519::Public,
293                key_store: ECDSAKeyStore<Self::KeystoreBackend>,
294                prometheus_config: $crate::prometheus::PrometheusConfig,
295            ) -> Result<Self, Error> {
296                let logger = if logger.id.is_empty() {
297                    DebugLogger { id: stringify!($name).replace("\"", "").into() }
298                } else {
299                    DebugLogger { id: (logger.id + " | " + stringify!($name)).replace("\"", "") }
300                };
301                Ok(Self {
302                    tx_manager,
303                    logger,
304                    client,
305                    network_inner,
306                    account_id,
307                    key_store,
308                    prometheus_config,
309                    jobs_client: Arc::new(parking_lot::Mutex::new(None)),
310                })
311            }
312
313            async fn generate_protocol_from(
314                &self,
315                associated_block_id: <<Env as GadgetEnvironment>::WorkManager as WorkManagerInterface>::Clock,
316                associated_retry_id: <<Env as GadgetEnvironment>::WorkManager as WorkManagerInterface>::RetryID,
317                associated_session_id: <<Env as GadgetEnvironment>::WorkManager as WorkManagerInterface>::SessionID,
318                associated_task_id: <<Env as GadgetEnvironment>::WorkManager as WorkManagerInterface>::TaskID,
319                protocol_message_rx: UnboundedReceiver<<Env as GadgetEnvironment>::ProtocolMessage>,
320                additional_params: Self::AsyncProtocolParameters,
321            ) -> Result<BuiltExecutableJobWrapper, JobError> {
322                $proto_gen_path(
323                    self,
324                    associated_block_id,
325                    associated_retry_id,
326                    associated_session_id,
327                    associated_task_id,
328                    protocol_message_rx,
329                    additional_params,
330                )
331                .await
332            }
333
334            fn internal_network(&self) -> &Self::Network {
335                &self.network_inner
336            }
337
338            async fn create_next_job(
339                &self,
340                job: JobInitMetadata,
341                work_manager: &ProtocolWorkManager<<Env as GadgetEnvironment>::WorkManager>,
342            ) -> Result<Self::AsyncProtocolParameters, Error> {
343                $create_job_path(self, job, work_manager).await
344            }
345
346            fn account_id(&self) -> &sp_core::sr25519::Public {
347                &self.account_id
348            }
349
350            fn name(&self) -> String {
351                $name.to_string()
352            }
353
354            fn role_filter(&self, role: roles::RoleType) -> bool {
355                $(
356                    if matches!(role, $role_filter) {
357                        return true;
358                    }
359                )*
360
361                false
362            }
363
364            fn phase_filter(
365                &self,
366                job: jobs::JobType<AccountId32, MaxParticipants, MaxSubmissionLen, MaxAdditionalParamsLen>,
367            ) -> bool {
368                matches!(job, $phase_filter)
369            }
370
371            fn jobs_client(&self) -> &SharedOptional<JobsClient<Env>> {
372                &self.jobs_client
373            }
374
375            fn tx_manager(&self) -> <Env as GadgetEnvironment>::TransactionManager {
376                self.tx_manager.clone()
377            }
378
379            fn logger(&self) -> DebugLogger {
380                self.logger.clone()
381            }
382
383            fn key_store(&self) -> &ECDSAKeyStore<Self::KeystoreBackend> {
384                &self.key_store
385            }
386
387            fn client(&self) -> <Env as GadgetEnvironment>::Client {
388                self.client.clone()
389            }
390        }
391    };
392}