blueprint_client_tangle/
client.rs

1use sp_core::ecdsa;
2use crate::error::{Result, Error};
3use crate::EventsClient;
4use blueprint_std::sync::Arc;
5use blueprint_std::time::Duration;
6use subxt::blocks::{Block, BlockRef};
7use subxt::events::Events;
8use subxt::utils::AccountId32;
9use subxt::PolkadotConfig;
10use tangle_subxt::subxt;
11use tangle_subxt::tangle_testnet_runtime::api;
12use tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::operator::OperatorMetadata;
13use blueprint_client_core::{BlueprintServicesClient, OperatorSet};
14use blueprint_runner::config::BlueprintEnvironment;
15use blueprint_crypto_sp_core::{SpEcdsa, SpSr25519};
16use blueprint_keystore::{Keystore, KeystoreConfig};
17use blueprint_keystore::backends::Backend;
18use crate::services::TangleServicesClient;
19
20/// The [Config](subxt::Config) providing the runtime types.
21pub type TangleConfig = PolkadotConfig;
22/// The client used to perform API calls, using the [`TangleConfig`].
23pub type OnlineClient = subxt::OnlineClient<TangleConfig>;
24type TangleBlock = Block<TangleConfig, OnlineClient>;
25type TangleBlockStream = subxt::backend::StreamOfResults<TangleBlock>;
26
27#[derive(Clone, Debug)]
28pub struct TangleEvent {
29    /// Finalized block number.
30    pub number: u64,
31    /// Finalized block header hash.
32    pub hash: [u8; 32],
33    /// Events
34    pub events: Events<TangleConfig>,
35}
36
37#[derive(Clone)]
38pub struct TangleClient {
39    finality_notification_stream: Arc<tokio::sync::Mutex<Option<TangleBlockStream>>>,
40    latest_finality_notification: Arc<tokio::sync::Mutex<Option<TangleEvent>>>,
41    account_id: AccountId32,
42    pub config: BlueprintEnvironment,
43    keystore: Arc<Keystore>,
44    services_client: TangleServicesClient<TangleConfig>,
45}
46
47impl TangleClient {
48    /// Create a new Tangle runtime client from an existing [`BlueprintEnvironment`].
49    ///
50    /// # Errors
51    ///
52    /// See [`Keystore::new()`]
53    /// See [`Self::with_keystore()`]
54    pub async fn new(config: BlueprintEnvironment) -> std::result::Result<Self, Error> {
55        let keystore_config =
56            KeystoreConfig::new().fs_root(config.keystore_uri.replace("file://", ""));
57
58        Self::with_keystore(config, Keystore::new(keystore_config)?).await
59    }
60
61    /// Create a new Tangle runtime client from an existing [`BlueprintEnvironment`] and a [`Keystore`].
62    ///
63    /// # Errors
64    ///
65    /// See [`subxt::OnlineClient::from_url()`]
66    pub async fn with_keystore(
67        config: BlueprintEnvironment,
68        keystore: Keystore,
69    ) -> std::result::Result<Self, Error> {
70        let rpc_url = config.ws_rpc_endpoint.as_str();
71        let client =
72            TangleServicesClient::new(subxt::OnlineClient::from_insecure_url(rpc_url).await?);
73
74        let account_id = keystore
75            .first_local::<SpSr25519>()
76            .map_err(Error::Keystore)?
77            .0
78            .0
79            .into();
80
81        Ok(Self {
82            keystore: Arc::new(keystore),
83            services_client: client,
84            finality_notification_stream: Arc::new(tokio::sync::Mutex::new(None)),
85            latest_finality_notification: Arc::new(tokio::sync::Mutex::new(None)),
86            account_id,
87            config,
88        })
89    }
90
91    /// Get the associated [`TangleServicesClient`]
92    #[must_use]
93    pub fn services_client(&self) -> &TangleServicesClient<subxt::PolkadotConfig> {
94        &self.services_client
95    }
96
97    #[must_use]
98    pub fn subxt_client(&self) -> &OnlineClient {
99        &self.services_client().rpc_client
100    }
101
102    /// Initialize the `TangleRuntime` instance by listening for finality notifications.
103    ///
104    /// This method must be called before using the instance.
105    async fn initialize(&self) -> Result<()> {
106        let finality_notification_stream = self
107            .services_client()
108            .rpc_client
109            .blocks()
110            .subscribe_finalized()
111            .await?;
112        *self.finality_notification_stream.lock().await = Some(finality_notification_stream);
113        Ok(())
114    }
115
116    #[must_use]
117    pub fn runtime_api(
118        &self,
119        at: [u8; 32],
120    ) -> subxt::runtime_api::RuntimeApi<TangleConfig, OnlineClient> {
121        let block_ref = BlockRef::from_hash(subxt::utils::H256::from_slice(&at));
122        self.services_client.rpc_client.runtime_api().at(block_ref)
123    }
124
125    #[must_use]
126    pub fn account_id(&self) -> &AccountId32 {
127        &self.account_id
128    }
129
130    /// Get [`metadata`](OperatorMetadata) for an operator by [`Account ID`](AccountId32)
131    #[allow(clippy::missing_errors_doc)]
132    pub async fn operator_metadata(
133        &self,
134        operator: AccountId32,
135    ) -> std::result::Result<
136        Option<
137            OperatorMetadata<
138                AccountId32,
139                api::assets::events::burned::Balance,
140                api::assets::events::accounts_destroyed::AssetId,
141                api::runtime_types::tangle_testnet_runtime::MaxDelegations,
142                api::runtime_types::tangle_testnet_runtime::MaxOperatorBlueprints,
143            >,
144        >,
145        Error,
146    > {
147        let storage = self
148            .services_client
149            .rpc_client
150            .storage()
151            .at_latest()
152            .await?;
153        let metadata_storage_key = api::storage().multi_asset_delegation().operators(operator);
154
155        let ret = storage.fetch(&metadata_storage_key).await?;
156        Ok(ret)
157    }
158
159    /// Retrieves the current party index and operator mapping
160    ///
161    /// # Errors
162    /// Returns an error if:
163    /// - Failed to retrieve operator keys
164    /// - Current party is not found in the operator list
165    pub async fn get_party_index_and_operators(
166        &self,
167    ) -> std::result::Result<
168        (
169            usize,
170            std::collections::BTreeMap<AccountId32, ecdsa::Public>,
171        ),
172        Error,
173    > {
174        let parties = self.get_operators().await?;
175        let my_id = self
176            .keystore
177            .first_local::<SpSr25519>()
178            .map_err(Error::Keystore)?;
179
180        blueprint_core::trace!(
181            "Looking for {my_id} in parties: {:?}",
182            parties.keys().collect::<Vec<_>>()
183        );
184
185        let index_of_my_id = parties
186            .iter()
187            .position(|(id, _key)| id.0 == my_id.0.to_raw())
188            .ok_or(Error::PartyNotFound)?;
189
190        Ok((index_of_my_id, parties))
191    }
192
193    pub async fn now(&self) -> Option<[u8; 32]> {
194        Some(self.latest_event().await?.hash)
195    }
196}
197
198impl blueprint_std::ops::Deref for TangleClient {
199    type Target = TangleServicesClient<TangleConfig>;
200
201    fn deref(&self) -> &Self::Target {
202        &self.services_client
203    }
204}
205
206impl EventsClient<TangleEvent> for TangleClient {
207    async fn next_event(&self) -> Option<TangleEvent> {
208        let mut finality_stream = tokio::time::timeout(
209            Duration::from_millis(500),
210            self.finality_notification_stream.lock(),
211        )
212        .await
213        .ok()?;
214        match finality_stream.as_mut() {
215            Some(stream) => {
216                let block = stream.next().await?.ok()?;
217                let events = block.events().await.ok()?;
218                let notification = TangleEvent {
219                    number: block.number().into(),
220                    hash: block.hash().into(),
221                    events,
222                };
223                let mut latest_finality = tokio::time::timeout(
224                    Duration::from_millis(500),
225                    self.latest_finality_notification.lock(),
226                )
227                .await
228                .ok()?;
229                *latest_finality = Some(notification.clone());
230                Some(notification)
231            }
232            None => {
233                drop(finality_stream);
234                self.initialize().await.ok()?;
235                // Next time, the stream should be initialized.
236                Box::pin(async { self.next_event().await }).await
237            }
238        }
239    }
240
241    async fn latest_event(&self) -> Option<TangleEvent> {
242        let latest_finality = tokio::time::timeout(
243            Duration::from_millis(500),
244            self.latest_finality_notification.lock(),
245        )
246        .await
247        .ok()?;
248        match &*latest_finality {
249            Some(notification) => Some(notification.clone()),
250            None => {
251                drop(latest_finality);
252                self.next_event().await
253            }
254        }
255    }
256}
257
258pub type BlueprintId = u64;
259
260impl BlueprintServicesClient for TangleClient {
261    type PublicApplicationIdentity = ecdsa::Public;
262    type PublicAccountIdentity = AccountId32;
263    type Id = BlueprintId;
264    type Error = Error;
265
266    /// Retrieves the ECDSA keys for all current service operators
267    ///
268    /// # Errors
269    /// Returns an error if:
270    /// - Failed to connect to the Tangle client
271    /// - Failed to retrieve operator information
272    /// - Missing ECDSA key for any operator
273    async fn get_operators(
274        &self,
275    ) -> std::result::Result<
276        OperatorSet<Self::PublicAccountIdentity, Self::PublicApplicationIdentity>,
277        Self::Error,
278    > {
279        let client = &self.services_client;
280        let current_blueprint = self.blueprint_id().await?;
281        let service_id = self
282            .config
283            .protocol_settings
284            .tangle()
285            .map_err(|_| Error::NotTangle)?
286            .service_id
287            .ok_or_else(|| Error::Other("No service ID injected into config".into()))?;
288        let now = self
289            .now()
290            .await
291            .ok_or_else(|| Error::Other("no timestamp in latest".into()))?;
292        let current_service_op = self
293            .services_client
294            .current_service_operators(now, service_id)
295            .await?;
296        let storage = client.rpc_client.storage().at_latest().await?;
297
298        let mut map = std::collections::BTreeMap::new();
299        for (operator, _) in current_service_op {
300            let addr = api::storage()
301                .services()
302                .operators(current_blueprint, &operator);
303
304            let maybe_pref = storage.fetch(&addr).await.map_err(|err| {
305                Error::Other(format!(
306                    "Failed to fetch operator storage for {operator}: {err}"
307                ))
308            })?;
309
310            if let Some(pref) = maybe_pref {
311                let public = ecdsa::Public::from_full(pref.key.as_slice()).map_err(|()| {
312                    Error::Other(format!(
313                        "Failed to convert the ECDSA public key for operator: {operator}"
314                    ))
315                })?;
316
317                map.insert(operator, public);
318            } else {
319                return Err(Error::MissingEcdsa(operator));
320            }
321        }
322
323        Ok(map)
324    }
325
326    async fn operator_id(
327        &self,
328    ) -> std::result::Result<Self::PublicApplicationIdentity, Self::Error> {
329        Ok(self.keystore.first_local::<SpEcdsa>()?.0)
330    }
331
332    /// Retrieves the current blueprint ID from the configuration
333    ///
334    /// # Errors
335    /// Returns an error if the blueprint ID is not found in the configuration
336    async fn blueprint_id(&self) -> std::result::Result<Self::Id, Self::Error> {
337        let c = self
338            .config
339            .protocol_settings
340            .tangle()
341            .map_err(|_| Error::NotTangle)?;
342        Ok(c.blueprint_id)
343    }
344}