blueprint_manager/executor/
mod.rs

1use crate::config::BlueprintManagerConfig;
2use crate::gadget::ActiveGadgets;
3use crate::sdk::entry::SendFuture;
4use crate::sdk::utils;
5use crate::sdk::utils::msg_to_error;
6use color_eyre::eyre::OptionExt;
7use color_eyre::Report;
8use gadget_io::GadgetConfig;
9use gadget_sdk::clients::tangle::runtime::{TangleConfig, TangleRuntimeClient};
10use gadget_sdk::clients::tangle::services::{RpcServicesWithBlueprint, ServicesClient};
11use gadget_sdk::clients::Client;
12use gadget_sdk::info;
13use gadget_sdk::keystore::backend::fs::FilesystemKeystore;
14use gadget_sdk::keystore::backend::GenericKeyStore;
15use gadget_sdk::keystore::{BackendExt, TanglePairSigner};
16use sp_core::H256;
17use std::collections::HashMap;
18use std::future::Future;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21use tangle_subxt::subxt::blocks::BlockRef;
22use tangle_subxt::subxt::tx::Signer;
23use tangle_subxt::subxt::utils::AccountId32;
24use tangle_subxt::subxt::Config;
25use tokio::task::JoinHandle;
26
27pub(crate) mod event_handler;
28
29pub async fn get_blueprints<C: Config>(
30    runtime: &ServicesClient<C>,
31    block_hash: [u8; 32],
32    account_id: AccountId32,
33) -> color_eyre::Result<Vec<RpcServicesWithBlueprint>>
34where
35    BlockRef<<C as Config>::Hash>: From<BlockRef<H256>>,
36{
37    runtime
38        .query_operator_blueprints(block_hash, account_id)
39        .await
40        .map_err(|err| msg_to_error(err.to_string()))
41}
42
43pub struct BlueprintManagerHandle {
44    shutdown_call: Option<tokio::sync::oneshot::Sender<()>>,
45    start_tx: Option<tokio::sync::oneshot::Sender<()>>,
46    running_task: JoinHandle<color_eyre::Result<()>>,
47    span: tracing::Span,
48    sr25519_id: TanglePairSigner<sp_core::sr25519::Pair>,
49    ecdsa_id: gadget_sdk::keystore::TanglePairSigner<sp_core::ecdsa::Pair>,
50    keystore_uri: String,
51}
52
53impl BlueprintManagerHandle {
54    /// Send a start signal to the blueprint manager
55    pub fn start(&mut self) -> color_eyre::Result<()> {
56        let _span = self.span.enter();
57        match self.start_tx.take() {
58            Some(tx) => match tx.send(()) {
59                Ok(_) => {
60                    info!("Start signal sent to Blueprint Manager");
61                    Ok(())
62                }
63                Err(_) => Err(Report::msg(
64                    "Failed to send start signal to Blueprint Manager",
65                )),
66            },
67            None => Err(Report::msg("Blueprint Manager Already Started")),
68        }
69    }
70
71    /// Returns the SR25519 keypair for this blueprint manager
72    pub fn sr25519_id(&self) -> &TanglePairSigner<sp_core::sr25519::Pair> {
73        &self.sr25519_id
74    }
75
76    /// Returns the ECDSA keypair for this blueprint manager
77    pub fn ecdsa_id(&self) -> &gadget_sdk::keystore::TanglePairSigner<sp_core::ecdsa::Pair> {
78        &self.ecdsa_id
79    }
80
81    /// Shutdown the blueprint manager
82    pub async fn shutdown(&mut self) -> color_eyre::Result<()> {
83        self.shutdown_call
84            .take()
85            .map(|tx| tx.send(()))
86            .ok_or_eyre("Shutdown already called")?
87            .map_err(|_| Report::msg("Failed to send shutdown signal to Blueprint Manager"))
88    }
89
90    /// Returns the keystore URI for this blueprint manager
91    pub fn keystore_uri(&self) -> &str {
92        &self.keystore_uri
93    }
94
95    pub fn span(&self) -> &tracing::Span {
96        &self.span
97    }
98}
99
100/// Add default behavior for unintentional dropping of the BlueprintManagerHandle
101/// This will ensure that the BlueprintManagerHandle is executed even if the handle
102/// is dropped, which is similar behavior to the tokio SpawnHandle
103impl Drop for BlueprintManagerHandle {
104    fn drop(&mut self) {
105        let _ = self.start();
106    }
107}
108
109/// Implement the Future trait for the BlueprintManagerHandle to allow
110/// for the handle to be awaited on
111impl Future for BlueprintManagerHandle {
112    type Output = color_eyre::Result<()>;
113
114    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
115        // Start the blueprint manager if it has not been started
116        let this = self.get_mut();
117        if this.start_tx.is_some() {
118            if let Err(err) = this.start() {
119                return Poll::Ready(Err(err));
120            }
121        }
122
123        let result = futures::ready!(Pin::new(&mut this.running_task).poll(cx));
124
125        match result {
126            Ok(res) => Poll::Ready(res),
127            Err(err) => Poll::Ready(Err(Report::msg(format!(
128                "Blueprint Manager Closed Unexpectedly (JoinError): {err:?}"
129            )))),
130        }
131    }
132}
133
134pub async fn run_blueprint_manager<F: SendFuture<'static, ()>>(
135    blueprint_manager_config: BlueprintManagerConfig,
136    gadget_config: GadgetConfig,
137    shutdown_cmd: F,
138) -> color_eyre::Result<BlueprintManagerHandle> {
139    let logger_id = if let Some(custom_id) = &blueprint_manager_config.instance_id {
140        custom_id.as_str()
141    } else {
142        "Local"
143    };
144
145    let span = tracing::info_span!("Blueprint-Manager", id = logger_id);
146
147    let _span = span.enter();
148    info!("Starting blueprint manager ... waiting for start signal ...");
149
150    let data_dir = &blueprint_manager_config.data_dir;
151    if !data_dir.exists() {
152        info!(
153            "Data directory does not exist, creating it at `{}`",
154            data_dir.display()
155        );
156        std::fs::create_dir_all(data_dir)?;
157    }
158
159    let (tangle_key, ecdsa_key) = {
160        let keystore = GenericKeyStore::<parking_lot::RawRwLock>::Fs(FilesystemKeystore::open(
161            &gadget_config.keystore_uri,
162        )?);
163        let sr_key = keystore.sr25519_key()?;
164        let ecdsa_key = keystore.ecdsa_key()?;
165        (sr_key, ecdsa_key)
166    };
167
168    let sub_account_id = tangle_key.account_id().clone();
169
170    let tangle_client =
171        TangleRuntimeClient::from_url(gadget_config.ws_rpc_url.as_str(), sub_account_id.clone())
172            .await?;
173    let services_client = ServicesClient::new(tangle_client.client());
174    let mut active_gadgets = HashMap::new();
175
176    let keystore_uri = gadget_config.keystore_uri.clone();
177
178    let manager_task = async move {
179        // With the basics setup, we must now implement the main logic of the Blueprint Manager
180        // Handle initialization logic
181        // NOTE: The node running this code should be registered as an operator for the blueprints, otherwise, this
182        // code will fail
183        let mut operator_subscribed_blueprints = handle_init(
184            &tangle_client,
185            &services_client,
186            &sub_account_id,
187            &mut active_gadgets,
188            &gadget_config,
189            &blueprint_manager_config,
190        )
191        .await?;
192
193        // Now, run the main event loop
194        // Listen to FinalityNotifications and poll for new/deleted services that correspond to the blueprints above
195        while let Some(event) = tangle_client.next_event().await {
196            let result = event_handler::check_blueprint_events(
197                &event,
198                &mut active_gadgets,
199                &sub_account_id.clone(),
200            )
201            .await;
202
203            if result.needs_update {
204                operator_subscribed_blueprints = services_client
205                    .query_operator_blueprints(event.hash, sub_account_id.clone())
206                    .await
207                    .map_err(|err| msg_to_error(err.to_string()))?;
208            }
209
210            event_handler::handle_tangle_event(
211                &event,
212                &operator_subscribed_blueprints,
213                &gadget_config,
214                &blueprint_manager_config,
215                &mut active_gadgets,
216                result,
217                &services_client,
218            )
219            .await?;
220        }
221
222        Err::<(), _>(utils::msg_to_error("Finality Notification stream died"))
223    };
224
225    let (tx_stop, rx_stop) = tokio::sync::oneshot::channel::<()>();
226
227    let shutdown_task = async move {
228        tokio::select! {
229            _res0 = shutdown_cmd => {
230                info!("Shutdown-1 command received, closing application");
231            },
232
233            _res1 = rx_stop => {
234                info!("Manual shutdown signal received, closing application");
235            }
236        }
237    };
238
239    let (start_tx, start_rx) = tokio::sync::oneshot::channel::<()>();
240
241    let combined_task = async move {
242        start_rx
243            .await
244            .map_err(|_err| Report::msg("Failed to receive start signal"))?;
245
246        tokio::select! {
247            res0 = manager_task => {
248                Err(Report::msg(format!("Blueprint Manager Closed Unexpectedly: {res0:?}")))
249            },
250
251            _ = shutdown_task => {
252                Ok(())
253            }
254        }
255    };
256
257    drop(_span);
258    let handle = tokio::spawn(combined_task);
259
260    let handle = BlueprintManagerHandle {
261        start_tx: Some(start_tx),
262        shutdown_call: Some(tx_stop),
263        running_task: handle,
264        span,
265        sr25519_id: tangle_key,
266        ecdsa_id: ecdsa_key,
267        keystore_uri,
268    };
269
270    Ok(handle)
271}
272
273/// * Query to get Vec<RpcServicesWithBlueprint>
274/// * For each RpcServicesWithBlueprint, fetch the associated gadget binary (fetch/download)
275///   -> If the services field is empty, just emit and log inside the executed binary "that states a new service instance got created by one of these blueprints"
276///   -> If the services field is not empty, for each service in RpcServicesWithBlueprint.services, spawn the gadget binary, using params to set the job type to listen to (in terms of our old language, each spawned service represents a single "RoleType")
277async fn handle_init(
278    tangle_runtime: &TangleRuntimeClient,
279    services_client: &ServicesClient<TangleConfig>,
280    sub_account_id: &AccountId32,
281    active_gadgets: &mut ActiveGadgets,
282    gadget_config: &GadgetConfig,
283    blueprint_manager_config: &BlueprintManagerConfig,
284) -> color_eyre::Result<Vec<RpcServicesWithBlueprint>> {
285    info!("Beginning initialization of Blueprint Manager");
286
287    let (operator_subscribed_blueprints, init_event) =
288        if let Some(event) = tangle_runtime.next_event().await {
289            (
290                get_blueprints(services_client, event.hash, sub_account_id.clone())
291                    .await
292                    .map_err(|err| Report::msg(format!("Failed to obtain blueprints: {err}")))?,
293                event,
294            )
295        } else {
296            return Err(Report::msg("Failed to get initial block hash"));
297        };
298
299    info!(
300        "Received {} initial blueprints this operator is registered to",
301        operator_subscribed_blueprints.len()
302    );
303
304    // Immediately poll, handling the initial state
305    let poll_result =
306        event_handler::check_blueprint_events(&init_event, active_gadgets, sub_account_id).await;
307
308    event_handler::handle_tangle_event(
309        &init_event,
310        &operator_subscribed_blueprints,
311        gadget_config,
312        blueprint_manager_config,
313        active_gadgets,
314        poll_result,
315        services_client,
316    )
317    .await?;
318
319    Ok(operator_subscribed_blueprints)
320}