1use crate::config::ProtocolConfig;
2use crate::environments::EventMetadata;
3use crate::module::network::Network;
4use crate::module::{GadgetProtocol, GeneralModule};
5use crate::prelude::PrometheusConfig;
6use gadget_core::gadget::general::GeneralGadget;
7use gadget_core::gadget::manager::{AbstractGadget, GadgetError, GadgetManager};
8pub use gadget_core::job::JobError;
9pub use gadget_core::job::*;
10pub use gadget_core::job_manager::WorkManagerInterface;
11pub use gadget_core::job_manager::{PollMethod, ProtocolWorkManager, WorkManagerError};
12use gadget_io::tokio::task::JoinError;
13use parking_lot::RwLock;
14use sp_core::ecdsa;
15use std::fmt::{Debug, Display, Formatter};
16use std::sync::Arc;
17
18pub use subxt_signer;
19pub mod environments;
20use crate::environments::GadgetEnvironment;
21use gadget_core::gadget::general::Client;
22pub mod module;
23
24#[allow(ambiguous_glob_reexports)]
25pub mod prelude {
26 pub use crate::client::*;
27 pub use crate::config::*;
28 pub use crate::environments::*;
29 pub use crate::full_protocol::{FullProtocolConfig, NodeInput};
30 pub use crate::generate_setup_and_run_command;
31 pub use crate::keystore::{ECDSAKeyStore, InMemoryBackend, KeystoreBackend};
32 pub use crate::module::WorkManagerConfig;
33 pub use crate::{BuiltExecutableJobWrapper, JobBuilder, JobError, WorkManagerInterface};
34 pub use async_trait::async_trait;
35 pub use gadget_core::job_manager::ProtocolWorkManager;
36 pub use gadget_core::job_manager::SendFuture;
37 pub use gadget_core::job_manager::WorkManagerError;
38 pub use gadget_io::tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
39 pub use parking_lot::Mutex;
40 pub use sp_runtime::traits::Block;
41 pub use std::pin::Pin;
42 pub use std::sync::Arc;
43}
44
45pub use async_trait;
47pub use color_eyre;
48pub use gadget_io;
49pub use tangle_subxt;
50
51pub mod tangle_runtime {
52 pub use tangle_subxt::subxt::utils::AccountId32;
53 pub use tangle_subxt::tangle_testnet_runtime::api;
54 pub use tangle_subxt::tangle_testnet_runtime::api::runtime_types::{
55 bounded_collections::bounded_vec::BoundedVec,
56 };
57}
58
59pub mod channels;
60pub mod client;
61pub mod config;
62pub mod debug_logger;
63pub mod full_protocol;
64pub mod helpers;
65pub mod keystore;
66pub mod locks;
67pub mod prometheus;
68pub mod protocol;
69pub mod tracer;
70pub mod utils;
71
72#[derive(Debug)]
73pub enum Error {
74 RegistryCreateError { err: String },
75 RegistrySendError { err: String },
76 RegistryRecvError { err: String },
77 RegistrySerializationError { err: String },
78 RegistryListenError { err: String },
79 GadgetManagerError { err: GadgetError },
80 InitError { err: String },
81 WorkManagerError { err: WorkManagerError },
82 ProtocolRemoteError { err: String },
83 ClientError { err: String },
84 JobError { err: JobError },
85 NetworkError { err: String },
86 KeystoreError { err: String },
87 MissingNetworkId,
88 PeerNotFound { id: ecdsa::Public },
89 JoinError { err: JoinError },
90 ParticipantNotSelected { id: ecdsa::Public, reason: String },
91 PrometheusError { err: String },
92 Other { err: String },
93}
94
95impl From<String> for crate::Error {
96 fn from(err: String) -> Self {
97 Self::Other { err }
98 }
99}
100
101impl Display for Error {
102 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
103 Debug::fmt(self, f)
104 }
105}
106
107impl std::error::Error for Error {}
108
109impl From<JobError> for Error {
110 fn from(err: JobError) -> Self {
111 Error::JobError { err }
112 }
113}
114
115pub async fn run_protocol<Env: GadgetEnvironment, T: ProtocolConfig<Env>>(
116 mut protocol_config: T,
117) -> Result<(), Error> {
118 let client = protocol_config.take_client();
119 let network = protocol_config.take_network();
120 let protocol = protocol_config.take_protocol();
121
122 let prometheus_config = protocol_config.prometheus_config();
123
124 let latest_finality_notification = get_latest_event_from_client::<Env>(&client).await?;
126 let work_manager = create_work_manager(&latest_finality_notification, &protocol).await?;
127 let proto_module = GeneralModule::new(network.clone(), protocol, work_manager);
128 let substrate_gadget = GeneralGadget::new(client, proto_module);
130 let network_future = network.run();
131 let gadget_future = async move {
132 if let Err(err) = substrate_gadget
135 .on_event_received(latest_finality_notification)
136 .await
137 {
138 substrate_gadget.process_error(err).await;
139 }
140
141 GadgetManager::new(substrate_gadget)
142 .await
143 .map_err(|err| Error::GadgetManagerError { err })
144 };
145
146 if let Err(err) = prometheus::setup(prometheus_config.clone()).await {
147 protocol_config
148 .logger()
149 .warn(format!("Error setting up prometheus: {err:?}"));
150 } else if let PrometheusConfig::Enabled { bind_addr } = prometheus_config {
151 protocol_config
152 .logger()
153 .info(format!("Prometheus enabled on {bind_addr}"));
154 }
155
156 gadget_io::tokio::try_join!(network_future, gadget_future).map(|_| ())
158}
159
160pub async fn create_work_manager<Env: GadgetEnvironment, P: GadgetProtocol<Env>>(
162 latest_event: &<Env as GadgetEnvironment>::Event,
163 protocol: &P,
164) -> Result<ProtocolWorkManager<<Env as GadgetEnvironment>::WorkManager>, Error> {
165 let now = latest_event.number();
166
167 let work_manager_config = protocol.get_work_manager_config();
168
169 let clock = Arc::new(RwLock::new(Some(now)));
170
171 let job_manager = protocol.generate_work_manager(clock.clone()).await;
172
173 let poll_method = match work_manager_config.interval {
174 Some(interval) => PollMethod::Interval {
175 millis: interval.as_millis() as u64,
176 },
177 None => PollMethod::Manual,
178 };
179
180 Ok(ProtocolWorkManager::new(
181 job_manager,
182 work_manager_config.max_active_tasks,
183 work_manager_config.max_pending_tasks,
184 poll_method,
185 ))
186}
187
188async fn get_latest_event_from_client<Env: GadgetEnvironment>(
189 client: &<Env as GadgetEnvironment>::Client,
190) -> Result<<Env as GadgetEnvironment>::Event, Error> {
191 Client::<Env::Event>::latest_event(client)
192 .await
193 .ok_or_else(|| Error::InitError {
194 err: "No event received".to_string(),
195 })
196}
197
198#[macro_export]
199#[allow(clippy::crate_in_macro_def)]
202macro_rules! generate_setup_and_run_command {
203 ($( $config:ident ),*) => {
204 pub fn setup_node<Env: GadgetEnvironment, N: Network<Env>, KBE: $crate::keystore::KeystoreBackend, D: Send + Clone + 'static>(node_input: NodeInput<Env, N, KBE, D>) -> impl SendFuture<'static, ()>
206 {
207 async move {
208 if let Err(err) = run(
209 node_input.clients,
210 node_input.tx_manager,
211 node_input.networks,
212 node_input.logger.clone(),
213 node_input.account_id,
214 node_input.keystore,
215 node_input.prometheus_config,
216 )
217 .await
218 {
219 node_input
220 .logger
221 .error(format!("Error running gadget: {:?}", err));
222 }
223 }
224 }
225
226 pub async fn run<Env: GadgetEnvironment, N: Network<Env>, KBE: $crate::keystore::KeystoreBackend>(
227 client: Vec<Env::Client>,
228 tx_manager: <Env as GadgetEnvironment>::TransactionManager,
229 networks: Vec<N>,
230 logger: DebugLogger,
231 account_id: sp_core::sr25519::Public,
232 key_store: ECDSAKeyStore<KBE>,
233 prometheus_config: $crate::prometheus::PrometheusConfig,
234 ) -> Result<(), Error>
235 {
236 use futures::TryStreamExt;
237 let futures = futures::stream::FuturesUnordered::new();
238 let mut networks: std::collections::VecDeque<_> = networks.into_iter().collect();
239 let mut clients: std::collections::VecDeque<_> = client.into_iter().collect();
240
241 $(
242 let config = crate::$config::new(clients.pop_front().expect("Not enough clients"), tx_manager.clone(), networks.pop_front().expect("Not enough networks"), logger.clone(), account_id.clone(), key_store.clone(), prometheus_config.clone()).await?;
243 futures.push(Box::pin(config.execute()) as std::pin::Pin<Box<dyn SendFuture<'static, Result<(), $crate::Error>>>>);
244 )*
245
246 if let Err(err) = futures.try_collect::<Vec<_>>().await.map(|_| ()) {
247 Err(err)
248 } else {
249 Ok(())
250 }
251 }
252 };
253}
254
255#[macro_export]
256macro_rules! generate_protocol {
257 ($name:expr, $struct_name:ident, $async_proto_params:ty, $proto_gen_path:expr, $create_job_path:expr, $phase_filter:pat, $( $role_filter:pat ),*) => {
258 #[protocol]
259 pub struct $struct_name<
260 Env: GadgetEnvironment,
261 N: Network<Env>,
262 KBE: KeystoreBackend,
263 > {
264 tx_manager: <Env as GadgetEnvironment>::TransactionManager,
265 logger: DebugLogger,
266 client: <Env as GadgetEnvironment>::Client,
267 network_inner: N,
269 account_id: sp_core::sr25519::Public,
270 key_store: ECDSAKeyStore<KBE>,
271 jobs_client: Arc<Mutex<Option<JobsClient<Env>>>>,
272 prometheus_config: $crate::prometheus::PrometheusConfig,
273 }
274
275 #[async_trait]
276 impl<
277 Env: GadgetEnvironment,
278 N: Network<Env>,
279 KBE: KeystoreBackend,
280 > FullProtocolConfig<Env> for $struct_name<Env, N, KBE>
281 {
282 type AsyncProtocolParameters = $async_proto_params;
283 type Network = N;
284 type AdditionalNodeParameters = ();
285 type KeystoreBackend = KBE;
286
287 async fn new(
288 client: <Env as GadgetEnvironment>::Client,
289 tx_manager: <Env as GadgetEnvironment>::TransactionManager,
290 network_inner: Self::Network,
291 logger: DebugLogger,
292 account_id: sp_core::sr25519::Public,
293 key_store: ECDSAKeyStore<Self::KeystoreBackend>,
294 prometheus_config: $crate::prometheus::PrometheusConfig,
295 ) -> Result<Self, Error> {
296 let logger = if logger.id.is_empty() {
297 DebugLogger { id: stringify!($name).replace("\"", "").into() }
298 } else {
299 DebugLogger { id: (logger.id + " | " + stringify!($name)).replace("\"", "") }
300 };
301 Ok(Self {
302 tx_manager,
303 logger,
304 client,
305 network_inner,
306 account_id,
307 key_store,
308 prometheus_config,
309 jobs_client: Arc::new(parking_lot::Mutex::new(None)),
310 })
311 }
312
313 async fn generate_protocol_from(
314 &self,
315 associated_block_id: <<Env as GadgetEnvironment>::WorkManager as WorkManagerInterface>::Clock,
316 associated_retry_id: <<Env as GadgetEnvironment>::WorkManager as WorkManagerInterface>::RetryID,
317 associated_session_id: <<Env as GadgetEnvironment>::WorkManager as WorkManagerInterface>::SessionID,
318 associated_task_id: <<Env as GadgetEnvironment>::WorkManager as WorkManagerInterface>::TaskID,
319 protocol_message_rx: UnboundedReceiver<<Env as GadgetEnvironment>::ProtocolMessage>,
320 additional_params: Self::AsyncProtocolParameters,
321 ) -> Result<BuiltExecutableJobWrapper, JobError> {
322 $proto_gen_path(
323 self,
324 associated_block_id,
325 associated_retry_id,
326 associated_session_id,
327 associated_task_id,
328 protocol_message_rx,
329 additional_params,
330 )
331 .await
332 }
333
334 fn internal_network(&self) -> &Self::Network {
335 &self.network_inner
336 }
337
338 async fn create_next_job(
339 &self,
340 job: JobInitMetadata,
341 work_manager: &ProtocolWorkManager<<Env as GadgetEnvironment>::WorkManager>,
342 ) -> Result<Self::AsyncProtocolParameters, Error> {
343 $create_job_path(self, job, work_manager).await
344 }
345
346 fn account_id(&self) -> &sp_core::sr25519::Public {
347 &self.account_id
348 }
349
350 fn name(&self) -> String {
351 $name.to_string()
352 }
353
354 fn role_filter(&self, role: roles::RoleType) -> bool {
355 $(
356 if matches!(role, $role_filter) {
357 return true;
358 }
359 )*
360
361 false
362 }
363
364 fn phase_filter(
365 &self,
366 job: jobs::JobType<AccountId32, MaxParticipants, MaxSubmissionLen, MaxAdditionalParamsLen>,
367 ) -> bool {
368 matches!(job, $phase_filter)
369 }
370
371 fn jobs_client(&self) -> &SharedOptional<JobsClient<Env>> {
372 &self.jobs_client
373 }
374
375 fn tx_manager(&self) -> <Env as GadgetEnvironment>::TransactionManager {
376 self.tx_manager.clone()
377 }
378
379 fn logger(&self) -> DebugLogger {
380 self.logger.clone()
381 }
382
383 fn key_store(&self) -> &ECDSAKeyStore<Self::KeystoreBackend> {
384 &self.key_store
385 }
386
387 fn client(&self) -> <Env as GadgetEnvironment>::Client {
388 self.client.clone()
389 }
390 }
391 };
392}