blueprint_client_tangle/
client.rs1use sp_core::ecdsa;
2use crate::error::{Result, Error};
3use crate::EventsClient;
4use blueprint_std::sync::Arc;
5use blueprint_std::time::Duration;
6use subxt::blocks::{Block, BlockRef};
7use subxt::events::Events;
8use subxt::utils::AccountId32;
9use subxt::PolkadotConfig;
10use tangle_subxt::subxt;
11use tangle_subxt::tangle_testnet_runtime::api;
12use tangle_subxt::tangle_testnet_runtime::api::runtime_types::pallet_multi_asset_delegation::types::operator::OperatorMetadata;
13use blueprint_client_core::{BlueprintServicesClient, OperatorSet};
14use blueprint_runner::config::BlueprintEnvironment;
15use blueprint_crypto_sp_core::{SpEcdsa, SpSr25519};
16use blueprint_keystore::{Keystore, KeystoreConfig};
17use blueprint_keystore::backends::Backend;
18use crate::services::TangleServicesClient;
19
20pub type TangleConfig = PolkadotConfig;
22pub type OnlineClient = subxt::OnlineClient<TangleConfig>;
24type TangleBlock = Block<TangleConfig, OnlineClient>;
25type TangleBlockStream = subxt::backend::StreamOfResults<TangleBlock>;
26
27#[derive(Clone, Debug)]
28pub struct TangleEvent {
29 pub number: u64,
31 pub hash: [u8; 32],
33 pub events: Events<TangleConfig>,
35}
36
37#[derive(Clone)]
38pub struct TangleClient {
39 finality_notification_stream: Arc<tokio::sync::Mutex<Option<TangleBlockStream>>>,
40 latest_finality_notification: Arc<tokio::sync::Mutex<Option<TangleEvent>>>,
41 account_id: AccountId32,
42 pub config: BlueprintEnvironment,
43 keystore: Arc<Keystore>,
44 services_client: TangleServicesClient<TangleConfig>,
45}
46
47impl TangleClient {
48 pub async fn new(config: BlueprintEnvironment) -> std::result::Result<Self, Error> {
55 let keystore_config =
56 KeystoreConfig::new().fs_root(config.keystore_uri.replace("file://", ""));
57
58 Self::with_keystore(config, Keystore::new(keystore_config)?).await
59 }
60
61 pub async fn with_keystore(
67 config: BlueprintEnvironment,
68 keystore: Keystore,
69 ) -> std::result::Result<Self, Error> {
70 let rpc_url = config.ws_rpc_endpoint.as_str();
71 let client =
72 TangleServicesClient::new(subxt::OnlineClient::from_insecure_url(rpc_url).await?);
73
74 let account_id = keystore
75 .first_local::<SpSr25519>()
76 .map_err(Error::Keystore)?
77 .0
78 .0
79 .into();
80
81 Ok(Self {
82 keystore: Arc::new(keystore),
83 services_client: client,
84 finality_notification_stream: Arc::new(tokio::sync::Mutex::new(None)),
85 latest_finality_notification: Arc::new(tokio::sync::Mutex::new(None)),
86 account_id,
87 config,
88 })
89 }
90
91 #[must_use]
93 pub fn services_client(&self) -> &TangleServicesClient<subxt::PolkadotConfig> {
94 &self.services_client
95 }
96
97 #[must_use]
98 pub fn subxt_client(&self) -> &OnlineClient {
99 &self.services_client().rpc_client
100 }
101
102 async fn initialize(&self) -> Result<()> {
106 let finality_notification_stream = self
107 .services_client()
108 .rpc_client
109 .blocks()
110 .subscribe_finalized()
111 .await?;
112 *self.finality_notification_stream.lock().await = Some(finality_notification_stream);
113 Ok(())
114 }
115
116 #[must_use]
117 pub fn runtime_api(
118 &self,
119 at: [u8; 32],
120 ) -> subxt::runtime_api::RuntimeApi<TangleConfig, OnlineClient> {
121 let block_ref = BlockRef::from_hash(subxt::utils::H256::from_slice(&at));
122 self.services_client.rpc_client.runtime_api().at(block_ref)
123 }
124
125 #[must_use]
126 pub fn account_id(&self) -> &AccountId32 {
127 &self.account_id
128 }
129
130 #[allow(clippy::missing_errors_doc)]
132 pub async fn operator_metadata(
133 &self,
134 operator: AccountId32,
135 ) -> std::result::Result<
136 Option<
137 OperatorMetadata<
138 AccountId32,
139 api::assets::events::burned::Balance,
140 api::assets::events::accounts_destroyed::AssetId,
141 api::runtime_types::tangle_testnet_runtime::MaxDelegations,
142 api::runtime_types::tangle_testnet_runtime::MaxOperatorBlueprints,
143 >,
144 >,
145 Error,
146 > {
147 let storage = self
148 .services_client
149 .rpc_client
150 .storage()
151 .at_latest()
152 .await?;
153 let metadata_storage_key = api::storage().multi_asset_delegation().operators(operator);
154
155 let ret = storage.fetch(&metadata_storage_key).await?;
156 Ok(ret)
157 }
158
159 pub async fn get_party_index_and_operators(
166 &self,
167 ) -> std::result::Result<
168 (
169 usize,
170 std::collections::BTreeMap<AccountId32, ecdsa::Public>,
171 ),
172 Error,
173 > {
174 let parties = self.get_operators().await?;
175 let my_id = self
176 .keystore
177 .first_local::<SpSr25519>()
178 .map_err(Error::Keystore)?;
179
180 blueprint_core::trace!(
181 "Looking for {my_id} in parties: {:?}",
182 parties.keys().collect::<Vec<_>>()
183 );
184
185 let index_of_my_id = parties
186 .iter()
187 .position(|(id, _key)| id.0 == my_id.0.to_raw())
188 .ok_or(Error::PartyNotFound)?;
189
190 Ok((index_of_my_id, parties))
191 }
192
193 pub async fn now(&self) -> Option<[u8; 32]> {
194 Some(self.latest_event().await?.hash)
195 }
196}
197
198impl blueprint_std::ops::Deref for TangleClient {
199 type Target = TangleServicesClient<TangleConfig>;
200
201 fn deref(&self) -> &Self::Target {
202 &self.services_client
203 }
204}
205
206impl EventsClient<TangleEvent> for TangleClient {
207 async fn next_event(&self) -> Option<TangleEvent> {
208 let mut finality_stream = tokio::time::timeout(
209 Duration::from_millis(500),
210 self.finality_notification_stream.lock(),
211 )
212 .await
213 .ok()?;
214 match finality_stream.as_mut() {
215 Some(stream) => {
216 let block = stream.next().await?.ok()?;
217 let events = block.events().await.ok()?;
218 let notification = TangleEvent {
219 number: block.number().into(),
220 hash: block.hash().into(),
221 events,
222 };
223 let mut latest_finality = tokio::time::timeout(
224 Duration::from_millis(500),
225 self.latest_finality_notification.lock(),
226 )
227 .await
228 .ok()?;
229 *latest_finality = Some(notification.clone());
230 Some(notification)
231 }
232 None => {
233 drop(finality_stream);
234 self.initialize().await.ok()?;
235 Box::pin(async { self.next_event().await }).await
237 }
238 }
239 }
240
241 async fn latest_event(&self) -> Option<TangleEvent> {
242 let latest_finality = tokio::time::timeout(
243 Duration::from_millis(500),
244 self.latest_finality_notification.lock(),
245 )
246 .await
247 .ok()?;
248 match &*latest_finality {
249 Some(notification) => Some(notification.clone()),
250 None => {
251 drop(latest_finality);
252 self.next_event().await
253 }
254 }
255 }
256}
257
258pub type BlueprintId = u64;
259
260impl BlueprintServicesClient for TangleClient {
261 type PublicApplicationIdentity = ecdsa::Public;
262 type PublicAccountIdentity = AccountId32;
263 type Id = BlueprintId;
264 type Error = Error;
265
266 async fn get_operators(
274 &self,
275 ) -> std::result::Result<
276 OperatorSet<Self::PublicAccountIdentity, Self::PublicApplicationIdentity>,
277 Self::Error,
278 > {
279 let client = &self.services_client;
280 let current_blueprint = self.blueprint_id().await?;
281 let service_id = self
282 .config
283 .protocol_settings
284 .tangle()
285 .map_err(|_| Error::NotTangle)?
286 .service_id
287 .ok_or_else(|| Error::Other("No service ID injected into config".into()))?;
288 let now = self
289 .now()
290 .await
291 .ok_or_else(|| Error::Other("no timestamp in latest".into()))?;
292 let current_service_op = self
293 .services_client
294 .current_service_operators(now, service_id)
295 .await?;
296 let storage = client.rpc_client.storage().at_latest().await?;
297
298 let mut map = std::collections::BTreeMap::new();
299 for (operator, _) in current_service_op {
300 let addr = api::storage()
301 .services()
302 .operators(current_blueprint, &operator);
303
304 let maybe_pref = storage.fetch(&addr).await.map_err(|err| {
305 Error::Other(format!(
306 "Failed to fetch operator storage for {operator}: {err}"
307 ))
308 })?;
309
310 if let Some(pref) = maybe_pref {
311 let public = ecdsa::Public::from_full(pref.key.as_slice()).map_err(|()| {
312 Error::Other(format!(
313 "Failed to convert the ECDSA public key for operator: {operator}"
314 ))
315 })?;
316
317 map.insert(operator, public);
318 } else {
319 return Err(Error::MissingEcdsa(operator));
320 }
321 }
322
323 Ok(map)
324 }
325
326 async fn operator_id(
327 &self,
328 ) -> std::result::Result<Self::PublicApplicationIdentity, Self::Error> {
329 Ok(self.keystore.first_local::<SpEcdsa>()?.0)
330 }
331
332 async fn blueprint_id(&self) -> std::result::Result<Self::Id, Self::Error> {
337 let c = self
338 .config
339 .protocol_settings
340 .tangle()
341 .map_err(|_| Error::NotTangle)?;
342 Ok(c.blueprint_id)
343 }
344}