blueprint_manager/executor/
mod.rs1use crate::config::BlueprintManagerConfig;
2use crate::gadget::ActiveGadgets;
3use crate::sdk::entry::SendFuture;
4use crate::sdk::utils;
5use crate::sdk::utils::msg_to_error;
6use color_eyre::eyre::OptionExt;
7use color_eyre::Report;
8use gadget_io::GadgetConfig;
9use gadget_sdk::clients::tangle::runtime::{TangleConfig, TangleRuntimeClient};
10use gadget_sdk::clients::tangle::services::{RpcServicesWithBlueprint, ServicesClient};
11use gadget_sdk::clients::Client;
12use gadget_sdk::info;
13use gadget_sdk::keystore::backend::fs::FilesystemKeystore;
14use gadget_sdk::keystore::backend::GenericKeyStore;
15use gadget_sdk::keystore::{BackendExt, TanglePairSigner};
16use sp_core::H256;
17use std::collections::HashMap;
18use std::future::Future;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21use tangle_subxt::subxt::blocks::BlockRef;
22use tangle_subxt::subxt::tx::Signer;
23use tangle_subxt::subxt::utils::AccountId32;
24use tangle_subxt::subxt::Config;
25use tokio::task::JoinHandle;
26
27pub(crate) mod event_handler;
28
29pub async fn get_blueprints<C: Config>(
30 runtime: &ServicesClient<C>,
31 block_hash: [u8; 32],
32 account_id: AccountId32,
33) -> color_eyre::Result<Vec<RpcServicesWithBlueprint>>
34where
35 BlockRef<<C as Config>::Hash>: From<BlockRef<H256>>,
36{
37 runtime
38 .query_operator_blueprints(block_hash, account_id)
39 .await
40 .map_err(|err| msg_to_error(err.to_string()))
41}
42
43pub struct BlueprintManagerHandle {
44 shutdown_call: Option<tokio::sync::oneshot::Sender<()>>,
45 start_tx: Option<tokio::sync::oneshot::Sender<()>>,
46 running_task: JoinHandle<color_eyre::Result<()>>,
47 span: tracing::Span,
48 sr25519_id: TanglePairSigner<sp_core::sr25519::Pair>,
49 ecdsa_id: gadget_sdk::keystore::TanglePairSigner<sp_core::ecdsa::Pair>,
50 keystore_uri: String,
51}
52
53impl BlueprintManagerHandle {
54 pub fn start(&mut self) -> color_eyre::Result<()> {
56 let _span = self.span.enter();
57 match self.start_tx.take() {
58 Some(tx) => match tx.send(()) {
59 Ok(_) => {
60 info!("Start signal sent to Blueprint Manager");
61 Ok(())
62 }
63 Err(_) => Err(Report::msg(
64 "Failed to send start signal to Blueprint Manager",
65 )),
66 },
67 None => Err(Report::msg("Blueprint Manager Already Started")),
68 }
69 }
70
71 pub fn sr25519_id(&self) -> &TanglePairSigner<sp_core::sr25519::Pair> {
73 &self.sr25519_id
74 }
75
76 pub fn ecdsa_id(&self) -> &gadget_sdk::keystore::TanglePairSigner<sp_core::ecdsa::Pair> {
78 &self.ecdsa_id
79 }
80
81 pub async fn shutdown(&mut self) -> color_eyre::Result<()> {
83 self.shutdown_call
84 .take()
85 .map(|tx| tx.send(()))
86 .ok_or_eyre("Shutdown already called")?
87 .map_err(|_| Report::msg("Failed to send shutdown signal to Blueprint Manager"))
88 }
89
90 pub fn keystore_uri(&self) -> &str {
92 &self.keystore_uri
93 }
94
95 pub fn span(&self) -> &tracing::Span {
96 &self.span
97 }
98}
99
100impl Drop for BlueprintManagerHandle {
104 fn drop(&mut self) {
105 let _ = self.start();
106 }
107}
108
109impl Future for BlueprintManagerHandle {
112 type Output = color_eyre::Result<()>;
113
114 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115 let this = self.get_mut();
117 if this.start_tx.is_some() {
118 if let Err(err) = this.start() {
119 return Poll::Ready(Err(err));
120 }
121 }
122
123 let result = futures::ready!(Pin::new(&mut this.running_task).poll(cx));
124
125 match result {
126 Ok(res) => Poll::Ready(res),
127 Err(err) => Poll::Ready(Err(Report::msg(format!(
128 "Blueprint Manager Closed Unexpectedly (JoinError): {err:?}"
129 )))),
130 }
131 }
132}
133
134pub async fn run_blueprint_manager<F: SendFuture<'static, ()>>(
135 blueprint_manager_config: BlueprintManagerConfig,
136 gadget_config: GadgetConfig,
137 shutdown_cmd: F,
138) -> color_eyre::Result<BlueprintManagerHandle> {
139 let logger_id = if let Some(custom_id) = &blueprint_manager_config.instance_id {
140 custom_id.as_str()
141 } else {
142 "Local"
143 };
144
145 let span = tracing::info_span!("Blueprint-Manager", id = logger_id);
146
147 let _span = span.enter();
148 info!("Starting blueprint manager ... waiting for start signal ...");
149
150 let data_dir = &blueprint_manager_config.data_dir;
151 if !data_dir.exists() {
152 info!(
153 "Data directory does not exist, creating it at `{}`",
154 data_dir.display()
155 );
156 std::fs::create_dir_all(data_dir)?;
157 }
158
159 let (tangle_key, ecdsa_key) = {
160 let keystore = GenericKeyStore::<parking_lot::RawRwLock>::Fs(FilesystemKeystore::open(
161 &gadget_config.keystore_uri,
162 )?);
163 let sr_key = keystore.sr25519_key()?;
164 let ecdsa_key = keystore.ecdsa_key()?;
165 (sr_key, ecdsa_key)
166 };
167
168 let sub_account_id = tangle_key.account_id().clone();
169
170 let tangle_client =
171 TangleRuntimeClient::from_url(gadget_config.ws_rpc_url.as_str(), sub_account_id.clone())
172 .await?;
173 let services_client = ServicesClient::new(tangle_client.client());
174 let mut active_gadgets = HashMap::new();
175
176 let keystore_uri = gadget_config.keystore_uri.clone();
177
178 let manager_task = async move {
179 let mut operator_subscribed_blueprints = handle_init(
184 &tangle_client,
185 &services_client,
186 &sub_account_id,
187 &mut active_gadgets,
188 &gadget_config,
189 &blueprint_manager_config,
190 )
191 .await?;
192
193 while let Some(event) = tangle_client.next_event().await {
196 let result = event_handler::check_blueprint_events(
197 &event,
198 &mut active_gadgets,
199 &sub_account_id.clone(),
200 )
201 .await;
202
203 if result.needs_update {
204 operator_subscribed_blueprints = services_client
205 .query_operator_blueprints(event.hash, sub_account_id.clone())
206 .await
207 .map_err(|err| msg_to_error(err.to_string()))?;
208 }
209
210 event_handler::handle_tangle_event(
211 &event,
212 &operator_subscribed_blueprints,
213 &gadget_config,
214 &blueprint_manager_config,
215 &mut active_gadgets,
216 result,
217 &services_client,
218 )
219 .await?;
220 }
221
222 Err::<(), _>(utils::msg_to_error("Finality Notification stream died"))
223 };
224
225 let (tx_stop, rx_stop) = tokio::sync::oneshot::channel::<()>();
226
227 let shutdown_task = async move {
228 tokio::select! {
229 _res0 = shutdown_cmd => {
230 info!("Shutdown-1 command received, closing application");
231 },
232
233 _res1 = rx_stop => {
234 info!("Manual shutdown signal received, closing application");
235 }
236 }
237 };
238
239 let (start_tx, start_rx) = tokio::sync::oneshot::channel::<()>();
240
241 let combined_task = async move {
242 start_rx
243 .await
244 .map_err(|_err| Report::msg("Failed to receive start signal"))?;
245
246 tokio::select! {
247 res0 = manager_task => {
248 Err(Report::msg(format!("Blueprint Manager Closed Unexpectedly: {res0:?}")))
249 },
250
251 _ = shutdown_task => {
252 Ok(())
253 }
254 }
255 };
256
257 drop(_span);
258 let handle = tokio::spawn(combined_task);
259
260 let handle = BlueprintManagerHandle {
261 start_tx: Some(start_tx),
262 shutdown_call: Some(tx_stop),
263 running_task: handle,
264 span,
265 sr25519_id: tangle_key,
266 ecdsa_id: ecdsa_key,
267 keystore_uri,
268 };
269
270 Ok(handle)
271}
272
273async fn handle_init(
278 tangle_runtime: &TangleRuntimeClient,
279 services_client: &ServicesClient<TangleConfig>,
280 sub_account_id: &AccountId32,
281 active_gadgets: &mut ActiveGadgets,
282 gadget_config: &GadgetConfig,
283 blueprint_manager_config: &BlueprintManagerConfig,
284) -> color_eyre::Result<Vec<RpcServicesWithBlueprint>> {
285 info!("Beginning initialization of Blueprint Manager");
286
287 let (operator_subscribed_blueprints, init_event) =
288 if let Some(event) = tangle_runtime.next_event().await {
289 (
290 get_blueprints(services_client, event.hash, sub_account_id.clone())
291 .await
292 .map_err(|err| Report::msg(format!("Failed to obtain blueprints: {err}")))?,
293 event,
294 )
295 } else {
296 return Err(Report::msg("Failed to get initial block hash"));
297 };
298
299 info!(
300 "Received {} initial blueprints this operator is registered to",
301 operator_subscribed_blueprints.len()
302 );
303
304 let poll_result =
306 event_handler::check_blueprint_events(&init_event, active_gadgets, sub_account_id).await;
307
308 event_handler::handle_tangle_event(
309 &init_event,
310 &operator_subscribed_blueprints,
311 gadget_config,
312 blueprint_manager_config,
313 active_gadgets,
314 poll_result,
315 services_client,
316 )
317 .await?;
318
319 Ok(operator_subscribed_blueprints)
320}