1use std::{fmt::Debug, time::Duration};
2
3use async_trait::async_trait;
4use bs58;
5use light_merkle_tree_metadata::QueueType;
6use photon_api::{
7 apis::configuration::{ApiKey, Configuration},
8 models::GetCompressedAccountsByOwnerPostRequestParams,
9};
10use solana_pubkey::Pubkey;
11use tracing::{debug, error, warn};
12
13use super::{
14 types::{CompressedAccount, OwnerBalance, SignatureWithMetadata, TokenAccount, TokenBalance},
15 BatchAddressUpdateIndexerResponse, MerkleProofWithContext,
16};
17use crate::indexer::{
18 base58::Base58Conversions,
19 config::RetryConfig,
20 response::{Context, Items, ItemsWithCursor, Response},
21 Address, AddressWithTree, GetCompressedAccountsByOwnerConfig,
22 GetCompressedTokenAccountsByOwnerOrDelegateOptions, Hash, Indexer, IndexerError,
23 IndexerRpcConfig, MerkleProof, NewAddressProofWithContext, PaginatedOptions,
24};
25
26pub struct PhotonIndexer {
28 configuration: Configuration,
29}
30
31impl PhotonIndexer {
32 pub fn default_path() -> String {
33 "http://127.0.0.1:8784".to_string()
34 }
35}
36
37impl PhotonIndexer {
38 async fn retry<F, Fut, T>(
39 &self,
40 config: RetryConfig,
41 mut operation: F,
42 ) -> Result<T, IndexerError>
43 where
44 F: FnMut() -> Fut,
45 Fut: std::future::Future<Output = Result<T, IndexerError>>,
46 {
47 let max_retries = config.num_retries;
48 let mut attempts = 0;
49 let mut delay_ms = config.delay_ms;
50 let max_delay_ms = config.max_delay_ms;
51
52 loop {
53 attempts += 1;
54
55 debug!(
56 "Attempt {}/{}: No rate limiter configured",
57 attempts, max_retries
58 );
59
60 debug!("Attempt {}/{}: Executing operation", attempts, max_retries);
61 let result = operation().await;
62
63 match result {
64 Ok(value) => {
65 debug!("Attempt {}/{}: Operation succeeded.", attempts, max_retries);
66 return Ok(value);
67 }
68 Err(e) => {
69 let is_retryable = match &e {
70 IndexerError::ApiError(_) => {
71 warn!("API Error: {}", e);
72 true
73 }
74 IndexerError::PhotonError {
75 context: _,
76 message: _,
77 } => {
78 warn!("Operation failed, checking if retryable...");
79 true
80 }
81 IndexerError::IndexerNotSyncedToSlot => true,
82 IndexerError::Base58DecodeError { .. } => false,
83 IndexerError::AccountNotFound => false,
84 IndexerError::InvalidParameters(_) => false,
85 IndexerError::NotImplemented(_) => false,
86 _ => false,
87 };
88
89 if is_retryable && attempts < max_retries {
90 warn!(
91 "Attempt {}/{}: Operation failed. Retrying",
92 attempts, max_retries
93 );
94
95 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
96 delay_ms = std::cmp::min(delay_ms * 2, max_delay_ms);
97 } else {
98 if is_retryable {
99 error!("Operation failed after max retries.");
100 } else {
101 error!("Operation failed with non-retryable error.");
102 }
103 return Err(e);
104 }
105 }
106 }
107 }
108 }
109}
110
111impl PhotonIndexer {
112 pub fn new(path: String, api_key: Option<String>) -> Self {
113 let configuration = Configuration {
114 base_path: path,
115 api_key: api_key.map(|key| ApiKey {
116 prefix: Some("api-key".to_string()),
117 key,
118 }),
119 ..Default::default()
120 };
121
122 PhotonIndexer { configuration }
123 }
124
125 fn extract_result<T>(context: &str, result: Option<T>) -> Result<T, IndexerError> {
126 result.ok_or_else(|| IndexerError::missing_result(context, "value not present"))
127 }
128
129 fn extract_result_with_error_check<T>(
130 context: &str,
131 error: Option<Box<photon_api::models::GetBatchAddressUpdateInfoPost200ResponseError>>,
132 result: Option<T>,
133 ) -> Result<T, IndexerError> {
134 if let Some(error) = error {
135 let error_message = error
136 .message
137 .unwrap_or_else(|| "Unknown API error".to_string());
138 return Err(IndexerError::ApiError(format!(
139 "API error in {} (code: {:?}): {}",
140 context, error.code, error_message
141 )));
142 }
143
144 Self::extract_result(context, result)
145 }
146
147 fn build_account_params(
148 &self,
149 address: Option<Address>,
150 hash: Option<Hash>,
151 ) -> Result<photon_api::models::GetCompressedAccountPostRequestParams, IndexerError> {
152 match (address, hash) {
153 (None, None) => Err(IndexerError::InvalidParameters(
154 "Either address or hash must be provided".to_string(),
155 )),
156 (Some(_), Some(_)) => Err(IndexerError::InvalidParameters(
157 "Only one of address or hash must be provided".to_string(),
158 )),
159 (address, hash) => Ok(photon_api::models::GetCompressedAccountPostRequestParams {
160 address: address.map(|x| x.to_base58()),
161 hash: hash.map(|x| x.to_base58()),
162 }),
163 }
164 }
165}
166
167impl Debug for PhotonIndexer {
168 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
169 f.debug_struct("PhotonIndexer")
170 .field("configuration", &self.configuration)
171 .finish()
172 }
173}
174
175#[async_trait]
176impl Indexer for PhotonIndexer {
177 async fn get_compressed_account(
178 &self,
179 address: Address,
180 config: Option<IndexerRpcConfig>,
181 ) -> Result<Response<CompressedAccount>, IndexerError> {
182 let config = config.unwrap_or_default();
183 self.retry(config.retry_config, || async {
184 let params = self.build_account_params(Some(address), None)?;
185 let request = photon_api::models::GetCompressedAccountPostRequest {
186 params: Box::new(params),
187 ..Default::default()
188 };
189
190 let result = photon_api::apis::default_api::get_compressed_account_post(
191 &self.configuration,
192 request,
193 )
194 .await?;
195 let api_response = Self::extract_result_with_error_check(
196 "get_compressed_account",
197 result.error,
198 result.result.map(|r| *r),
199 )?;
200 if api_response.context.slot < config.slot {
201 return Err(IndexerError::IndexerNotSyncedToSlot);
202 }
203 let account_data = api_response
204 .value
205 .ok_or(IndexerError::AccountNotFound)
206 .map(|boxed| *boxed)?;
207 let account = CompressedAccount::try_from(&account_data)?;
208
209 Ok(Response {
210 context: Context {
211 slot: api_response.context.slot,
212 },
213 value: account,
214 })
215 })
216 .await
217 }
218
219 async fn get_compressed_account_by_hash(
220 &self,
221 hash: Hash,
222 config: Option<IndexerRpcConfig>,
223 ) -> Result<Response<CompressedAccount>, IndexerError> {
224 let config = config.unwrap_or_default();
225 self.retry(config.retry_config, || async {
226 let params = self.build_account_params(None, Some(hash))?;
227 let request = photon_api::models::GetCompressedAccountPostRequest {
228 params: Box::new(params),
229 ..Default::default()
230 };
231
232 let result = photon_api::apis::default_api::get_compressed_account_post(
233 &self.configuration,
234 request,
235 )
236 .await?;
237 let api_response = Self::extract_result_with_error_check(
238 "get_compressed_account_by_hash",
239 result.error,
240 result.result.map(|r| *r),
241 )?;
242 if api_response.context.slot < config.slot {
243 return Err(IndexerError::IndexerNotSyncedToSlot);
244 }
245 let account_data = api_response
246 .value
247 .ok_or(IndexerError::AccountNotFound)
248 .map(|boxed| *boxed)?;
249 let account = CompressedAccount::try_from(&account_data)?;
250
251 Ok(Response {
252 context: Context {
253 slot: api_response.context.slot,
254 },
255 value: account,
256 })
257 })
258 .await
259 }
260
261 async fn get_compressed_accounts_by_owner(
262 &self,
263 owner: &Pubkey,
264 options: Option<GetCompressedAccountsByOwnerConfig>,
265 config: Option<IndexerRpcConfig>,
266 ) -> Result<Response<ItemsWithCursor<CompressedAccount>>, IndexerError> {
267 let config = config.unwrap_or_default();
268 self.retry(config.retry_config, || async {
269 #[cfg(feature = "v2")]
270 {
271 let request = photon_api::models::GetCompressedAccountsByOwnerV2PostRequest {
272 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
273 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
274 data_slice: options.as_ref().and_then(|o| {
275 o.data_slice.as_ref().map(|ds| {
276 Box::new(photon_api::models::DataSlice {
277 length: ds.length as u32,
278 offset: ds.offset as u32,
279 })
280 })
281 }),
282 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
283 limit: options.as_ref().and_then(|o| o.limit),
284 owner: owner.to_string(),
285 }),
286 ..Default::default()
287 };
288 let result =
289 photon_api::apis::default_api::get_compressed_accounts_by_owner_v2_post(
290 &self.configuration,
291 request,
292 )
293 .await?;
294 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
295 if response.context.slot < config.slot {
296 return Err(IndexerError::IndexerNotSyncedToSlot);
297 }
298 let accounts: Result<Vec<_>, _> = response
299 .value
300 .items
301 .iter()
302 .map(CompressedAccount::try_from)
303 .collect();
304
305 let cursor = response.value.cursor;
306
307 Ok(Response {
308 context: Context {
309 slot: response.context.slot,
310 },
311 value: ItemsWithCursor {
312 items: accounts?,
313 cursor,
314 },
315 })
316 }
317 #[cfg(not(feature = "v2"))]
318 {
319 let request = photon_api::models::GetCompressedAccountsByOwnerPostRequest {
320 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
321 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
322 data_slice: options.as_ref().and_then(|o| {
323 o.data_slice.as_ref().map(|ds| {
324 Box::new(photon_api::models::DataSlice {
325 length: ds.length as u32,
326 offset: ds.offset as u32,
327 })
328 })
329 }),
330 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
331 limit: options.as_ref().and_then(|o| o.limit),
332 owner: owner.to_string(),
333 }),
334 ..Default::default()
335 };
336 let result = photon_api::apis::default_api::get_compressed_accounts_by_owner_post(
337 &self.configuration,
338 request,
339 )
340 .await?;
341 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
342 if response.context.slot < config.slot {
343 return Err(IndexerError::IndexerNotSyncedToSlot);
344 }
345 let accounts: Result<Vec<_>, _> = response
346 .value
347 .items
348 .iter()
349 .map(CompressedAccount::try_from)
350 .collect();
351
352 let cursor = response.value.cursor;
353
354 Ok(Response {
355 context: Context {
356 slot: response.context.slot,
357 },
358 value: ItemsWithCursor {
359 items: accounts?,
360 cursor,
361 },
362 })
363 }
364 })
365 .await
366 }
367
368 async fn get_compressed_balance(
369 &self,
370 address: Option<Address>,
371 hash: Option<Hash>,
372 config: Option<IndexerRpcConfig>,
373 ) -> Result<Response<u64>, IndexerError> {
374 let config = config.unwrap_or_default();
375 self.retry(config.retry_config, || async {
376 let params = self.build_account_params(address, hash)?;
377 let request = photon_api::models::GetCompressedAccountBalancePostRequest {
378 params: Box::new(params),
379 ..Default::default()
380 };
381
382 let result = photon_api::apis::default_api::get_compressed_account_balance_post(
383 &self.configuration,
384 request,
385 )
386 .await?;
387
388 let api_response = Self::extract_result_with_error_check(
389 "get_compressed_account_balance",
390 result.error,
391 result.result.map(|r| *r),
392 )?;
393 if api_response.context.slot < config.slot {
394 return Err(IndexerError::IndexerNotSyncedToSlot);
395 }
396 Ok(Response {
397 context: Context {
398 slot: api_response.context.slot,
399 },
400 value: api_response.value,
401 })
402 })
403 .await
404 }
405
406 async fn get_compressed_balance_by_owner(
407 &self,
408 owner: &Pubkey,
409 config: Option<IndexerRpcConfig>,
410 ) -> Result<Response<u64>, IndexerError> {
411 let config = config.unwrap_or_default();
412 self.retry(config.retry_config, || async {
413 let request = photon_api::models::GetCompressedBalanceByOwnerPostRequest {
414 params: Box::new(
415 photon_api::models::GetCompressedBalanceByOwnerPostRequestParams {
416 owner: owner.to_string(),
417 },
418 ),
419 ..Default::default()
420 };
421
422 let result = photon_api::apis::default_api::get_compressed_balance_by_owner_post(
423 &self.configuration,
424 request,
425 )
426 .await?;
427
428 let api_response = Self::extract_result_with_error_check(
429 "get_compressed_balance_by_owner",
430 result.error,
431 result.result.map(|r| *r),
432 )?;
433 if api_response.context.slot < config.slot {
434 return Err(IndexerError::IndexerNotSyncedToSlot);
435 }
436 Ok(Response {
437 context: Context {
438 slot: api_response.context.slot,
439 },
440 value: api_response.value,
441 })
442 })
443 .await
444 }
445
446 async fn get_compressed_mint_token_holders(
447 &self,
448 mint: &Pubkey,
449 options: Option<PaginatedOptions>,
450 config: Option<IndexerRpcConfig>,
451 ) -> Result<Response<ItemsWithCursor<OwnerBalance>>, IndexerError> {
452 let config = config.unwrap_or_default();
453 self.retry(config.retry_config, || async {
454 let request = photon_api::models::GetCompressedMintTokenHoldersPostRequest {
455 params: Box::new(
456 photon_api::models::GetCompressedMintTokenHoldersPostRequestParams {
457 mint: mint.to_string(),
458 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
459 limit: options.as_ref().and_then(|o| o.limit),
460 },
461 ),
462 ..Default::default()
463 };
464
465 let result = photon_api::apis::default_api::get_compressed_mint_token_holders_post(
466 &self.configuration,
467 request,
468 )
469 .await?;
470
471 let api_response = Self::extract_result_with_error_check(
472 "get_compressed_mint_token_holders",
473 result.error,
474 result.result.map(|r| *r),
475 )?;
476 if api_response.context.slot < config.slot {
477 return Err(IndexerError::IndexerNotSyncedToSlot);
478 }
479 let owner_balances: Result<Vec<_>, _> = api_response
480 .value
481 .items
482 .iter()
483 .map(OwnerBalance::try_from)
484 .collect();
485
486 let cursor = api_response.value.cursor;
487
488 Ok(Response {
489 context: Context {
490 slot: api_response.context.slot,
491 },
492 value: ItemsWithCursor {
493 items: owner_balances?,
494 cursor,
495 },
496 })
497 })
498 .await
499 }
500
501 async fn get_compressed_token_account_balance(
502 &self,
503 address: Option<Address>,
504 hash: Option<Hash>,
505 config: Option<IndexerRpcConfig>,
506 ) -> Result<Response<u64>, IndexerError> {
507 let config = config.unwrap_or_default();
508 self.retry(config.retry_config, || async {
509 let request = photon_api::models::GetCompressedTokenAccountBalancePostRequest {
510 params: Box::new(photon_api::models::GetCompressedAccountPostRequestParams {
511 address: address.map(|x| x.to_base58()),
512 hash: hash.map(|x| x.to_base58()),
513 }),
514 ..Default::default()
515 };
516
517 let result = photon_api::apis::default_api::get_compressed_token_account_balance_post(
518 &self.configuration,
519 request,
520 )
521 .await?;
522
523 let api_response = Self::extract_result_with_error_check(
524 "get_compressed_token_account_balance",
525 result.error,
526 result.result.map(|r| *r),
527 )?;
528 if api_response.context.slot < config.slot {
529 return Err(IndexerError::IndexerNotSyncedToSlot);
530 }
531 Ok(Response {
532 context: Context {
533 slot: api_response.context.slot,
534 },
535 value: api_response.value.amount,
536 })
537 })
538 .await
539 }
540
541 async fn get_compressed_token_accounts_by_delegate(
542 &self,
543 delegate: &Pubkey,
544 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
545 config: Option<IndexerRpcConfig>,
546 ) -> Result<Response<ItemsWithCursor<TokenAccount>>, IndexerError> {
547 let config = config.unwrap_or_default();
548 self.retry(config.retry_config, || async {
549 #[cfg(feature = "v2")]
550 {
551 let request = photon_api::models::GetCompressedTokenAccountsByDelegateV2PostRequest {
552 params: Box::new(
553 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
554 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
555 limit: options.as_ref().and_then(|o| o.limit),
556 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
557 delegate: delegate.to_string(),
558 },
559 ),
560 ..Default::default()
561 };
562
563 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_v2_post(
564 &self.configuration,
565 request,
566 )
567 .await?;
568
569 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
570 if response.context.slot < config.slot {
571 return Err(IndexerError::IndexerNotSyncedToSlot);
572 }
573
574 let token_accounts: Result<Vec<_>, _> = response
575 .value
576 .items
577 .iter()
578 .map(TokenAccount::try_from)
579 .collect();
580
581 let cursor = response.value.cursor;
582
583 Ok(Response {
584 context: Context {
585 slot: response.context.slot,
586 },
587 value: ItemsWithCursor {
588 items: token_accounts?,
589 cursor,
590 },
591 })
592 }
593 #[cfg(not(feature = "v2"))]
594 {
595 let request = photon_api::models::GetCompressedTokenAccountsByDelegatePostRequest {
596 params: Box::new(
597 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
598 delegate: delegate.to_string(),
599 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
600 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
601 limit: options.as_ref().and_then(|o| o.limit),
602 },
603 ),
604 ..Default::default()
605 };
606
607 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_post(
608 &self.configuration,
609 request,
610 )
611 .await?;
612
613 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
614 if response.context.slot < config.slot {
615 return Err(IndexerError::IndexerNotSyncedToSlot);
616 }
617
618 let token_accounts: Result<Vec<_>, _> = response
619 .value
620 .items
621 .iter()
622 .map(TokenAccount::try_from)
623 .collect();
624
625 let cursor = response.value.cursor;
626
627 Ok(Response {
628 context: Context {
629 slot: response.context.slot,
630 },
631 value: ItemsWithCursor {
632 items: token_accounts?,
633 cursor,
634 },
635 })
636 }
637 })
638 .await
639 }
640
641 async fn get_compressed_token_accounts_by_owner(
642 &self,
643 owner: &Pubkey,
644 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
645 config: Option<IndexerRpcConfig>,
646 ) -> Result<Response<ItemsWithCursor<TokenAccount>>, IndexerError> {
647 let config = config.unwrap_or_default();
648 self.retry(config.retry_config, || async {
649 #[cfg(feature = "v2")]
650 {
651 let request = photon_api::models::GetCompressedTokenAccountsByOwnerV2PostRequest {
652 params: Box::from(
653 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
654 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
655 limit: options.as_ref().and_then(|o| o.limit),
656 mint: options
657 .as_ref()
658 .and_then(|o| o.mint.as_ref())
659 .map(|x| x.to_string()),
660 owner: owner.to_string(),
661 },
662 ),
663 ..Default::default()
664 };
665 let result =
666 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_v2_post(
667 &self.configuration,
668 request,
669 )
670 .await?;
671 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
672 if response.context.slot < config.slot {
673 return Err(IndexerError::IndexerNotSyncedToSlot);
674 }
675 let token_accounts: Result<Vec<_>, _> = response
676 .value
677 .items
678 .iter()
679 .map(TokenAccount::try_from)
680 .collect();
681
682 let cursor = response.value.cursor;
683
684 Ok(Response {
685 context: Context {
686 slot: response.context.slot,
687 },
688 value: ItemsWithCursor {
689 items: token_accounts?,
690 cursor,
691 },
692 })
693 }
694 #[cfg(not(feature = "v2"))]
695 {
696 let request = photon_api::models::GetCompressedTokenAccountsByOwnerPostRequest {
697 params: Box::new(
698 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
699 owner: owner.to_string(),
700 mint: options
701 .as_ref()
702 .and_then(|o| o.mint.as_ref())
703 .map(|x| x.to_string()),
704 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
705 limit: options.as_ref().and_then(|o| o.limit),
706 },
707 ),
708 ..Default::default()
709 };
710
711 let result =
712 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_post(
713 &self.configuration,
714 request,
715 )
716 .await?;
717
718 let response = Self::extract_result_with_error_check(
719 "get_compressed_token_accounts_by_owner",
720 result.error,
721 result.result.map(|r| *r),
722 )?;
723 if response.context.slot < config.slot {
724 return Err(IndexerError::IndexerNotSyncedToSlot);
725 }
726 let token_accounts: Result<Vec<_>, _> = response
727 .value
728 .items
729 .iter()
730 .map(TokenAccount::try_from)
731 .collect();
732
733 let cursor = response.value.cursor;
734
735 Ok(Response {
736 context: Context {
737 slot: response.context.slot,
738 },
739 value: ItemsWithCursor {
740 items: token_accounts?,
741 cursor,
742 },
743 })
744 }
745 })
746 .await
747 }
748
749 async fn get_compressed_token_balances_by_owner_v2(
750 &self,
751 owner: &Pubkey,
752 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
753 config: Option<IndexerRpcConfig>,
754 ) -> Result<Response<ItemsWithCursor<TokenBalance>>, IndexerError> {
755 let config = config.unwrap_or_default();
756 self.retry(config.retry_config, || async {
757 #[cfg(feature = "v2")]
758 {
759 let request = photon_api::models::GetCompressedTokenBalancesByOwnerV2PostRequest {
760 params: Box::new(
761 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
762 owner: owner.to_string(),
763 mint: options
764 .as_ref()
765 .and_then(|o| o.mint.as_ref())
766 .map(|x| x.to_string()),
767 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
768 limit: options.as_ref().and_then(|o| o.limit),
769 },
770 ),
771 ..Default::default()
772 };
773
774 let result =
775 photon_api::apis::default_api::get_compressed_token_balances_by_owner_v2_post(
776 &self.configuration,
777 request,
778 )
779 .await?;
780
781 let api_response = Self::extract_result_with_error_check(
782 "get_compressed_token_balances_by_owner_v2",
783 result.error,
784 result.result.map(|r| *r),
785 )?;
786 if api_response.context.slot < config.slot {
787 return Err(IndexerError::IndexerNotSyncedToSlot);
788 }
789
790 let token_balances: Result<Vec<_>, _> = api_response
791 .value
792 .items
793 .iter()
794 .map(TokenBalance::try_from)
795 .collect();
796
797 Ok(Response {
798 context: Context {
799 slot: api_response.context.slot,
800 },
801 value: ItemsWithCursor {
802 items: token_balances?,
803 cursor: api_response.value.cursor,
804 },
805 })
806 }
807 #[cfg(not(feature = "v2"))]
808 {
809 let request = photon_api::models::GetCompressedTokenBalancesByOwnerPostRequest {
810 params: Box::new(
811 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
812 owner: owner.to_string(),
813 mint: options
814 .as_ref()
815 .and_then(|o| o.mint.as_ref())
816 .map(|x| x.to_string()),
817 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
818 limit: options.as_ref().and_then(|o| o.limit),
819 },
820 ),
821 ..Default::default()
822 };
823
824 let result =
825 photon_api::apis::default_api::get_compressed_token_balances_by_owner_post(
826 &self.configuration,
827 request,
828 )
829 .await?;
830
831 let api_response = Self::extract_result_with_error_check(
832 "get_compressed_token_balances_by_owner",
833 result.error,
834 result.result.map(|r| *r),
835 )?;
836 if api_response.context.slot < config.slot {
837 return Err(IndexerError::IndexerNotSyncedToSlot);
838 }
839
840 let token_balances: Result<Vec<_>, _> = api_response
841 .value
842 .token_balances
843 .iter()
844 .map(TokenBalance::try_from)
845 .collect();
846
847 Ok(Response {
848 context: Context {
849 slot: api_response.context.slot,
850 },
851 value: ItemsWithCursor {
852 items: token_balances?,
853 cursor: api_response.value.cursor,
854 },
855 })
856 }
857 })
858 .await
859 }
860
861 async fn get_compression_signatures_for_account(
862 &self,
863 hash: Hash,
864 config: Option<IndexerRpcConfig>,
865 ) -> Result<Response<Items<SignatureWithMetadata>>, IndexerError> {
866 let config = config.unwrap_or_default();
867 self.retry(config.retry_config, || async {
868 let request = photon_api::models::GetCompressionSignaturesForAccountPostRequest {
869 params: Box::new(
870 photon_api::models::GetCompressedAccountProofPostRequestParams {
871 hash: hash.to_base58(),
872 },
873 ),
874 ..Default::default()
875 };
876
877 let result =
878 photon_api::apis::default_api::get_compression_signatures_for_account_post(
879 &self.configuration,
880 request,
881 )
882 .await?;
883
884 let api_response = Self::extract_result_with_error_check(
885 "get_compression_signatures_for_account",
886 result.error,
887 result.result.map(|r| *r),
888 )?;
889 if api_response.context.slot < config.slot {
890 return Err(IndexerError::IndexerNotSyncedToSlot);
891 }
892 let signatures = api_response
893 .value
894 .items
895 .iter()
896 .map(SignatureWithMetadata::try_from)
897 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
898
899 Ok(Response {
900 context: Context {
901 slot: api_response.context.slot,
902 },
903 value: Items { items: signatures },
904 })
905 })
906 .await
907 }
908
909 async fn get_compression_signatures_for_address(
910 &self,
911 address: &[u8; 32],
912 options: Option<PaginatedOptions>,
913 config: Option<IndexerRpcConfig>,
914 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
915 let config = config.unwrap_or_default();
916 self.retry(config.retry_config, || async {
917 let request = photon_api::models::GetCompressionSignaturesForAddressPostRequest {
918 params: Box::new(
919 photon_api::models::GetCompressionSignaturesForAddressPostRequestParams {
920 address: address.to_base58(),
921 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
922 limit: options.as_ref().and_then(|o| o.limit),
923 },
924 ),
925 ..Default::default()
926 };
927
928 let result =
929 photon_api::apis::default_api::get_compression_signatures_for_address_post(
930 &self.configuration,
931 request,
932 )
933 .await?;
934
935 let api_response = Self::extract_result_with_error_check(
936 "get_compression_signatures_for_address",
937 result.error,
938 result.result.map(|r| *r),
939 )?;
940 if api_response.context.slot < config.slot {
941 return Err(IndexerError::IndexerNotSyncedToSlot);
942 }
943
944 let signatures = api_response
945 .value
946 .items
947 .iter()
948 .map(SignatureWithMetadata::try_from)
949 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
950
951 let cursor = api_response.value.cursor;
952
953 Ok(Response {
954 context: Context {
955 slot: api_response.context.slot,
956 },
957 value: ItemsWithCursor {
958 items: signatures,
959 cursor,
960 },
961 })
962 })
963 .await
964 }
965
966 async fn get_compression_signatures_for_owner(
967 &self,
968 owner: &Pubkey,
969 options: Option<PaginatedOptions>,
970 config: Option<IndexerRpcConfig>,
971 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
972 let config = config.unwrap_or_default();
973 self.retry(config.retry_config, || async {
974 let request = photon_api::models::GetCompressionSignaturesForOwnerPostRequest {
975 params: Box::new(
976 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
977 owner: owner.to_string(),
978 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
979 limit: options.as_ref().and_then(|o| o.limit),
980 },
981 ),
982 ..Default::default()
983 };
984
985 let result = photon_api::apis::default_api::get_compression_signatures_for_owner_post(
986 &self.configuration,
987 request,
988 )
989 .await?;
990
991 let api_response = Self::extract_result_with_error_check(
992 "get_compression_signatures_for_owner",
993 result.error,
994 result.result.map(|r| *r),
995 )?;
996 if api_response.context.slot < config.slot {
997 return Err(IndexerError::IndexerNotSyncedToSlot);
998 }
999
1000 let signatures = api_response
1001 .value
1002 .items
1003 .iter()
1004 .map(SignatureWithMetadata::try_from)
1005 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1006
1007 let cursor = api_response.value.cursor;
1008
1009 Ok(Response {
1010 context: Context {
1011 slot: api_response.context.slot,
1012 },
1013 value: ItemsWithCursor {
1014 items: signatures,
1015 cursor,
1016 },
1017 })
1018 })
1019 .await
1020 }
1021
1022 async fn get_compression_signatures_for_token_owner(
1023 &self,
1024 owner: &Pubkey,
1025 options: Option<PaginatedOptions>,
1026 config: Option<IndexerRpcConfig>,
1027 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
1028 let config = config.unwrap_or_default();
1029 self.retry(config.retry_config, || async {
1030 let request = photon_api::models::GetCompressionSignaturesForTokenOwnerPostRequest {
1031 params: Box::new(
1032 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
1033 owner: owner.to_string(),
1034 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
1035 limit: options.as_ref().and_then(|o| o.limit),
1036 },
1037 ),
1038 ..Default::default()
1039 };
1040
1041 let result =
1042 photon_api::apis::default_api::get_compression_signatures_for_token_owner_post(
1043 &self.configuration,
1044 request,
1045 )
1046 .await?;
1047
1048 let api_response = Self::extract_result_with_error_check(
1049 "get_compression_signatures_for_token_owner",
1050 result.error,
1051 result.result.map(|r| *r),
1052 )?;
1053 if api_response.context.slot < config.slot {
1054 return Err(IndexerError::IndexerNotSyncedToSlot);
1055 }
1056
1057 let signatures = api_response
1058 .value
1059 .items
1060 .iter()
1061 .map(SignatureWithMetadata::try_from)
1062 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1063
1064 let cursor = api_response.value.cursor;
1065
1066 Ok(Response {
1067 context: Context {
1068 slot: api_response.context.slot,
1069 },
1070 value: ItemsWithCursor {
1071 items: signatures,
1072 cursor,
1073 },
1074 })
1075 })
1076 .await
1077 }
1078
1079 async fn get_indexer_health(&self, config: Option<RetryConfig>) -> Result<bool, IndexerError> {
1080 let config = config.unwrap_or_default();
1081 self.retry(config, || async {
1082 let request = photon_api::models::GetIndexerHealthPostRequest {
1083 ..Default::default()
1084 };
1085
1086 let result = photon_api::apis::default_api::get_indexer_health_post(
1087 &self.configuration,
1088 request,
1089 )
1090 .await?;
1091
1092 let _api_response = Self::extract_result_with_error_check(
1093 "get_indexer_health",
1094 result.error,
1095 result.result,
1096 )?;
1097
1098 Ok(true)
1099 })
1100 .await
1101 }
1102
1103 async fn get_indexer_slot(&self, config: Option<RetryConfig>) -> Result<u64, IndexerError> {
1104 let config = config.unwrap_or_default();
1105 self.retry(config, || async {
1106 let request = photon_api::models::GetIndexerSlotPostRequest {
1107 ..Default::default()
1108 };
1109
1110 let result =
1111 photon_api::apis::default_api::get_indexer_slot_post(&self.configuration, request)
1112 .await?;
1113
1114 let result = Self::extract_result_with_error_check(
1115 "get_indexer_slot",
1116 result.error,
1117 result.result,
1118 )?;
1119 Ok(result)
1120 })
1121 .await
1122 }
1123
1124 async fn get_multiple_compressed_account_proofs(
1125 &self,
1126 hashes: Vec<[u8; 32]>,
1127 config: Option<IndexerRpcConfig>,
1128 ) -> Result<Response<Items<MerkleProof>>, IndexerError> {
1129 let config = config.unwrap_or_default();
1130 self.retry(config.retry_config, || async {
1131 let hashes_for_async = hashes.clone();
1132
1133 let request: photon_api::models::GetMultipleCompressedAccountProofsPostRequest =
1134 photon_api::models::GetMultipleCompressedAccountProofsPostRequest {
1135 params: hashes_for_async
1136 .into_iter()
1137 .map(|hash| bs58::encode(hash).into_string())
1138 .collect(),
1139 ..Default::default()
1140 };
1141
1142 debug!("API request: {:?}", request);
1143
1144 let result =
1145 photon_api::apis::default_api::get_multiple_compressed_account_proofs_post(
1146 &self.configuration,
1147 request,
1148 )
1149 .await?;
1150 debug!("Raw API response: {:?}", result);
1151
1152 if let Some(error) = &result.error {
1153 let error_msg = error.message.as_deref().unwrap_or("Unknown error");
1154 let error_code = error.code.unwrap_or(0);
1155 tracing::error!("API returned error: {}", error_msg);
1156 return Err(IndexerError::PhotonError {
1157 context: "get_multiple_compressed_account_proofs".to_string(),
1158 message: format!("API Error (code {}): {}", error_code, error_msg),
1159 });
1160 }
1161
1162 let photon_proofs = result.result.ok_or_else(|| {
1163 IndexerError::missing_result(
1164 "get_multiple_new_address_proofs",
1165 "No result returned from Photon API",
1166 )
1167 })?;
1168 if photon_proofs.context.slot < config.slot {
1169 return Err(IndexerError::IndexerNotSyncedToSlot);
1170 }
1171
1172 let proofs = photon_proofs
1173 .value
1174 .iter()
1175 .map(|x| {
1176 let mut proof_vec = x.proof.clone();
1177 proof_vec.truncate(proof_vec.len() - 10); let proof = proof_vec
1180 .iter()
1181 .map(|x| Hash::from_base58(x))
1182 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()
1183 .map_err(|e| IndexerError::Base58DecodeError {
1184 field: "proof".to_string(),
1185 message: e.to_string(),
1186 })?;
1187
1188 Ok(MerkleProof {
1189 hash: <[u8; 32] as Base58Conversions>::from_base58(&x.hash)?,
1190 leaf_index: x.leaf_index,
1191 merkle_tree: Pubkey::from_str_const(x.merkle_tree.as_str()),
1192 proof,
1193 root_seq: x.root_seq,
1194 root: [0u8; 32],
1195 })
1196 })
1197 .collect::<Result<Vec<MerkleProof>, IndexerError>>()?;
1198
1199 Ok(Response {
1200 context: Context {
1201 slot: photon_proofs.context.slot,
1202 },
1203 value: Items { items: proofs },
1204 })
1205 })
1206 .await
1207 }
1208
1209 async fn get_multiple_compressed_accounts(
1210 &self,
1211 addresses: Option<Vec<Address>>,
1212 hashes: Option<Vec<Hash>>,
1213 config: Option<IndexerRpcConfig>,
1214 ) -> Result<Response<Items<CompressedAccount>>, IndexerError> {
1215 let config = config.unwrap_or_default();
1216 self.retry(config.retry_config, || async {
1217 let hashes = hashes.clone();
1218 let addresses = addresses.clone();
1219 let request = photon_api::models::GetMultipleCompressedAccountsPostRequest {
1220 params: Box::new(
1221 photon_api::models::GetMultipleCompressedAccountsPostRequestParams {
1222 addresses: addresses.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1223 hashes: hashes.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1224 },
1225 ),
1226 ..Default::default()
1227 };
1228
1229 let result = photon_api::apis::default_api::get_multiple_compressed_accounts_post(
1230 &self.configuration,
1231 request,
1232 )
1233 .await?;
1234
1235 let api_response = Self::extract_result_with_error_check(
1236 "get_multiple_compressed_accounts",
1237 result.error,
1238 result.result.map(|r| *r),
1239 )?;
1240 if api_response.context.slot < config.slot {
1241 return Err(IndexerError::IndexerNotSyncedToSlot);
1242 }
1243 let accounts = api_response
1244 .value
1245 .items
1246 .iter()
1247 .map(CompressedAccount::try_from)
1248 .collect::<Result<Vec<CompressedAccount>, IndexerError>>()?;
1249
1250 Ok(Response {
1251 context: Context {
1252 slot: api_response.context.slot,
1253 },
1254 value: Items { items: accounts },
1255 })
1256 })
1257 .await
1258 }
1259
1260 async fn get_multiple_new_address_proofs(
1261 &self,
1262 merkle_tree_pubkey: [u8; 32],
1263 addresses: Vec<[u8; 32]>,
1264 config: Option<IndexerRpcConfig>,
1265 ) -> Result<Response<Items<NewAddressProofWithContext>>, IndexerError> {
1266 let config = config.unwrap_or_default();
1267 self.retry(config.retry_config, || async {
1268 let params: Vec<photon_api::models::address_with_tree::AddressWithTree> = addresses
1269 .iter()
1270 .map(|x| photon_api::models::address_with_tree::AddressWithTree {
1271 address: bs58::encode(x).into_string(),
1272 tree: bs58::encode(&merkle_tree_pubkey).into_string(),
1273 })
1274 .collect();
1275
1276 let request = photon_api::models::GetMultipleNewAddressProofsV2PostRequest {
1277 params,
1278 ..Default::default()
1279 };
1280
1281 let result = photon_api::apis::default_api::get_multiple_new_address_proofs_v2_post(
1282 &self.configuration,
1283 request,
1284 )
1285 .await;
1286
1287 match &result {
1288 Ok(response) => debug!("Raw API response: {:?}", response),
1289 Err(e) => error!("API request failed: {:?}", e),
1290 }
1291
1292 let result = result?;
1293
1294 let api_response = match Self::extract_result_with_error_check(
1295 "get_multiple_new_address_proofs",
1296 result.error,
1297 result.result.map(|r| *r),
1298 ) {
1299 Ok(proofs) => proofs,
1300 Err(e) => {
1301 error!("Failed to extract proofs: {:?}", e);
1302 return Err(e);
1303 }
1304 };
1305 if api_response.context.slot < config.slot {
1306 return Err(IndexerError::IndexerNotSyncedToSlot);
1307 }
1308 let photon_proofs = api_response.value;
1309 let mut proofs = Vec::new();
1310 for photon_proof in photon_proofs {
1311 let tree_pubkey = Hash::from_base58(&photon_proof.merkle_tree).map_err(|e| {
1312 IndexerError::Base58DecodeError {
1313 field: "merkle_tree".to_string(),
1314 message: e.to_string(),
1315 }
1316 })?;
1317
1318 let low_address_value = Hash::from_base58(&photon_proof.lower_range_address)
1319 .map_err(|e| IndexerError::Base58DecodeError {
1320 field: "lower_range_address".to_string(),
1321 message: e.to_string(),
1322 })?;
1323
1324 let next_address_value = Hash::from_base58(&photon_proof.higher_range_address)
1325 .map_err(|e| IndexerError::Base58DecodeError {
1326 field: "higher_range_address".to_string(),
1327 message: e.to_string(),
1328 })?;
1329
1330 let mut proof_vec: Vec<[u8; 32]> = photon_proof
1331 .proof
1332 .iter()
1333 .map(|x: &String| Hash::from_base58(x))
1334 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()?;
1335
1336 proof_vec.truncate(proof_vec.len() - 10); let mut proof_arr = [[0u8; 32]; 16];
1338 proof_arr.copy_from_slice(&proof_vec);
1339
1340 let root = Hash::from_base58(&photon_proof.root).map_err(|e| {
1341 IndexerError::Base58DecodeError {
1342 field: "root".to_string(),
1343 message: e.to_string(),
1344 }
1345 })?;
1346
1347 let proof = NewAddressProofWithContext {
1348 merkle_tree: tree_pubkey.into(),
1349 low_address_index: photon_proof.low_element_leaf_index,
1350 low_address_value,
1351 low_address_next_index: photon_proof.next_index,
1352 low_address_next_value: next_address_value,
1353 low_address_proof: proof_arr.to_vec(),
1354 root,
1355 root_seq: photon_proof.root_seq,
1356 new_low_element: None,
1357 new_element: None,
1358 new_element_next_value: None,
1359 };
1360 proofs.push(proof);
1361 }
1362
1363 Ok(Response {
1364 context: Context {
1365 slot: api_response.context.slot,
1366 },
1367 value: Items { items: proofs },
1368 })
1369 })
1370 .await
1371 }
1372
1373 async fn get_validity_proof(
1374 &self,
1375 hashes: Vec<Hash>,
1376 new_addresses_with_trees: Vec<AddressWithTree>,
1377 config: Option<IndexerRpcConfig>,
1378 ) -> Result<Response<super::types::ValidityProofWithContext>, IndexerError> {
1379 let config = config.unwrap_or_default();
1380 self.retry(config.retry_config, || async {
1381 #[cfg(feature = "v2")]
1382 {
1383 let request = photon_api::models::GetValidityProofV2PostRequest {
1384 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1385 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1386 new_addresses_with_trees: Some(
1387 new_addresses_with_trees
1388 .iter()
1389 .map(|x| photon_api::models::AddressWithTree {
1390 address: x.address.to_base58(),
1391 tree: x.tree.to_string(),
1392 })
1393 .collect(),
1394 ),
1395 }),
1396 ..Default::default()
1397 };
1398
1399 let result = photon_api::apis::default_api::get_validity_proof_v2_post(
1400 &self.configuration,
1401 request,
1402 )
1403 .await?;
1404 let api_response = Self::extract_result_with_error_check(
1405 "get_validity_proof_v2",
1406 result.error,
1407 result.result.map(|r| *r),
1408 )?;
1409 if api_response.context.slot < config.slot {
1410 return Err(IndexerError::IndexerNotSyncedToSlot);
1411 }
1412 let validity_proof =
1413 super::types::ValidityProofWithContext::from_api_model_v2(*api_response.value)?;
1414
1415 Ok(Response {
1416 context: Context {
1417 slot: api_response.context.slot,
1418 },
1419 value: validity_proof,
1420 })
1421 }
1422 #[cfg(not(feature = "v2"))]
1423 {
1424 let request = photon_api::models::GetValidityProofPostRequest {
1425 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1426 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1427 new_addresses_with_trees: Some(
1428 new_addresses_with_trees
1429 .iter()
1430 .map(|x| photon_api::models::AddressWithTree {
1431 address: x.address.to_base58(),
1432 tree: x.tree.to_string(),
1433 })
1434 .collect(),
1435 ),
1436 }),
1437 ..Default::default()
1438 };
1439
1440 let result = photon_api::apis::default_api::get_validity_proof_post(
1441 &self.configuration,
1442 request,
1443 )
1444 .await?;
1445
1446 let api_response = Self::extract_result_with_error_check(
1447 "get_validity_proof",
1448 result.error,
1449 result.result.map(|r| *r),
1450 )?;
1451 if api_response.context.slot < config.slot {
1452 return Err(IndexerError::IndexerNotSyncedToSlot);
1453 }
1454 let validity_proof = super::types::ValidityProofWithContext::from_api_model(
1455 *api_response.value,
1456 hashes.len(),
1457 )?;
1458
1459 Ok(Response {
1460 context: Context {
1461 slot: api_response.context.slot,
1462 },
1463 value: validity_proof,
1464 })
1465 }
1466 })
1467 .await
1468 }
1469
1470 async fn get_address_queue_with_proofs(
1471 &mut self,
1472 _merkle_tree_pubkey: &Pubkey,
1473 _zkp_batch_size: u16,
1474 _start_offset: Option<u64>,
1475 _config: Option<IndexerRpcConfig>,
1476 ) -> Result<Response<BatchAddressUpdateIndexerResponse>, IndexerError> {
1477 #[cfg(not(feature = "v2"))]
1478 unimplemented!("get_address_queue_with_proofs");
1479 #[cfg(feature = "v2")]
1480 {
1481 let merkle_tree_pubkey = _merkle_tree_pubkey;
1482 let limit = _zkp_batch_size;
1483 let start_queue_index = _start_offset;
1484 let config = _config.unwrap_or_default();
1485 self.retry(config.retry_config, || async {
1486 let merkle_tree = Hash::from_bytes(merkle_tree_pubkey.to_bytes().as_ref())?;
1487 let request = photon_api::models::GetBatchAddressUpdateInfoPostRequest {
1488 params: Box::new(
1489 photon_api::models::GetBatchAddressUpdateInfoPostRequestParams {
1490 limit,
1491 start_queue_index,
1492 tree: merkle_tree.to_base58(),
1493 },
1494 ),
1495 ..Default::default()
1496 };
1497
1498 let result = photon_api::apis::default_api::get_batch_address_update_info_post(
1499 &self.configuration,
1500 request,
1501 )
1502 .await?;
1503
1504 let api_response = Self::extract_result_with_error_check(
1505 "get_batch_address_update_info",
1506 result.error,
1507 result.result.map(|r| *r),
1508 )?;
1509 if api_response.context.slot < config.slot {
1510 return Err(IndexerError::IndexerNotSyncedToSlot);
1511 }
1512
1513 let addresses = api_response
1514 .addresses
1515 .iter()
1516 .map(|x| crate::indexer::AddressQueueIndex {
1517 address: Hash::from_base58(x.address.clone().as_ref()).unwrap(),
1518 queue_index: x.queue_index,
1519 })
1520 .collect();
1521
1522 let mut proofs: Vec<NewAddressProofWithContext> = vec![];
1523 for proof in api_response.non_inclusion_proofs {
1524 let proof = NewAddressProofWithContext {
1525 merkle_tree: *merkle_tree_pubkey,
1526 low_address_index: proof.low_element_leaf_index,
1527 low_address_value: Hash::from_base58(
1528 proof.lower_range_address.clone().as_ref(),
1529 )
1530 .unwrap(),
1531 low_address_next_index: proof.next_index,
1532 low_address_next_value: Hash::from_base58(
1533 proof.higher_range_address.clone().as_ref(),
1534 )
1535 .unwrap(),
1536 low_address_proof: proof
1537 .proof
1538 .iter()
1539 .map(|x| Hash::from_base58(x.clone().as_ref()).unwrap())
1540 .collect(),
1541 root: Hash::from_base58(proof.root.clone().as_ref()).unwrap(),
1542 root_seq: proof.root_seq,
1543
1544 new_low_element: None,
1545 new_element: None,
1546 new_element_next_value: None,
1547 };
1548 proofs.push(proof);
1549 }
1550
1551 let subtrees = api_response
1552 .subtrees
1553 .iter()
1554 .map(|x| {
1555 let mut arr = [0u8; 32];
1556 arr.copy_from_slice(x.as_slice());
1557 arr
1558 })
1559 .collect::<Vec<_>>();
1560
1561 let result = BatchAddressUpdateIndexerResponse {
1562 batch_start_index: api_response.start_index,
1563 addresses,
1564 non_inclusion_proofs: proofs,
1565 subtrees,
1566 };
1567 Ok(Response {
1568 context: Context {
1569 slot: api_response.context.slot,
1570 },
1571 value: result,
1572 })
1573 })
1574 .await
1575 }
1576 }
1577
1578 async fn get_queue_elements(
1579 &mut self,
1580 _pubkey: [u8; 32],
1581 _queue_type: QueueType,
1582 _num_elements: u16,
1583 _start_offset: Option<u64>,
1584 _config: Option<IndexerRpcConfig>,
1585 ) -> Result<Response<Items<MerkleProofWithContext>>, IndexerError> {
1586 #[cfg(not(feature = "v2"))]
1587 unimplemented!("get_queue_elements");
1588 #[cfg(feature = "v2")]
1589 {
1590 let pubkey = _pubkey;
1591 let queue_type = _queue_type;
1592 let limit = _num_elements;
1593 let start_queue_index = _start_offset;
1594 let config = _config.unwrap_or_default();
1595 self.retry(config.retry_config, || async {
1596 let request: photon_api::models::GetQueueElementsPostRequest =
1597 photon_api::models::GetQueueElementsPostRequest {
1598 params: Box::from(photon_api::models::GetQueueElementsPostRequestParams {
1599 tree: bs58::encode(pubkey).into_string(),
1600 queue_type: queue_type as u16,
1601 limit,
1602 start_queue_index,
1603 }),
1604 ..Default::default()
1605 };
1606 let result = photon_api::apis::default_api::get_queue_elements_post(
1607 &self.configuration,
1608 request,
1609 )
1610 .await;
1611
1612 let result: Result<Response<Items<MerkleProofWithContext>>, IndexerError> =
1613 match result {
1614 Ok(api_response) => match api_response.result {
1615 Some(api_result) => {
1616 if api_result.context.slot < config.slot {
1617 return Err(IndexerError::IndexerNotSyncedToSlot);
1618 }
1619 let response = api_result.value;
1620 let proofs: Vec<MerkleProofWithContext> = response
1621 .iter()
1622 .map(|x| {
1623 let proof = x
1624 .proof
1625 .iter()
1626 .map(|x| Hash::from_base58(x).unwrap())
1627 .collect();
1628 let root = Hash::from_base58(&x.root).unwrap();
1629 let leaf = Hash::from_base58(&x.leaf).unwrap();
1630 let merkle_tree = Hash::from_base58(&x.tree).unwrap();
1631 let tx_hash = x
1632 .tx_hash
1633 .as_ref()
1634 .map(|x| Hash::from_base58(x).unwrap());
1635 let account_hash =
1636 Hash::from_base58(&x.account_hash).unwrap();
1637
1638 MerkleProofWithContext {
1639 proof,
1640 root,
1641 leaf_index: x.leaf_index,
1642 leaf,
1643 merkle_tree,
1644 root_seq: x.root_seq,
1645 tx_hash,
1646 account_hash,
1647 }
1648 })
1649 .collect();
1650
1651 Ok(Response {
1652 context: Context {
1653 slot: api_result.context.slot,
1654 },
1655 value: Items { items: proofs },
1656 })
1657 }
1658 None => {
1659 let error = api_response.error.ok_or_else(|| {
1660 IndexerError::PhotonError {
1661 context: "get_queue_elements".to_string(),
1662 message: "No error details provided".to_string(),
1663 }
1664 })?;
1665
1666 Err(IndexerError::PhotonError {
1667 context: "get_queue_elements".to_string(),
1668 message: error
1669 .message
1670 .unwrap_or_else(|| "Unknown error".to_string()),
1671 })
1672 }
1673 },
1674 Err(e) => Err(IndexerError::PhotonError {
1675 context: "get_queue_elements".to_string(),
1676 message: e.to_string(),
1677 }),
1678 };
1679
1680 result
1681 })
1682 .await
1683 }
1684 }
1685
1686 async fn get_subtrees(
1687 &self,
1688 _merkle_tree_pubkey: [u8; 32],
1689 _config: Option<IndexerRpcConfig>,
1690 ) -> Result<Response<Items<[u8; 32]>>, IndexerError> {
1691 #[cfg(not(feature = "v2"))]
1692 unimplemented!();
1693 #[cfg(feature = "v2")]
1694 {
1695 todo!();
1696 }
1697 }
1698}