1use std::{fmt::Debug, time::Duration};
2
3use async_trait::async_trait;
4use bs58;
5use light_merkle_tree_metadata::QueueType;
6use photon_api::{
7 apis::configuration::{ApiKey, Configuration},
8 models::GetCompressedAccountsByOwnerPostRequestParams,
9};
10use solana_pubkey::Pubkey;
11use tracing::{error, trace, warn};
12
13use super::{
14 types::{
15 CompressedAccount, CompressedTokenAccount, OwnerBalance, QueueElementsResult,
16 SignatureWithMetadata, TokenBalance,
17 },
18 BatchAddressUpdateIndexerResponse,
19};
20use crate::indexer::{
21 base58::Base58Conversions,
22 config::RetryConfig,
23 response::{Context, Items, ItemsWithCursor, Response},
24 Address, AddressWithTree, GetCompressedAccountsByOwnerConfig,
25 GetCompressedTokenAccountsByOwnerOrDelegateOptions, Hash, Indexer, IndexerError,
26 IndexerRpcConfig, MerkleProof, NewAddressProofWithContext, PaginatedOptions,
27};
28
29pub struct PhotonIndexer {
31 configuration: Configuration,
32}
33
34impl PhotonIndexer {
35 pub fn default_path() -> String {
36 "http://127.0.0.1:8784".to_string()
37 }
38}
39
40impl PhotonIndexer {
41 async fn retry<F, Fut, T>(
42 &self,
43 config: RetryConfig,
44 mut operation: F,
45 ) -> Result<T, IndexerError>
46 where
47 F: FnMut() -> Fut,
48 Fut: std::future::Future<Output = Result<T, IndexerError>>,
49 {
50 let max_retries = config.num_retries;
51 let mut attempts = 0;
52 let mut delay_ms = config.delay_ms;
53 let max_delay_ms = config.max_delay_ms;
54
55 loop {
56 attempts += 1;
57
58 trace!(
59 "Attempt {}/{}: No rate limiter configured",
60 attempts,
61 max_retries
62 );
63
64 trace!("Attempt {}/{}: Executing operation", attempts, max_retries);
65 let result = operation().await;
66
67 match result {
68 Ok(value) => {
69 trace!("Attempt {}/{}: Operation succeeded.", attempts, max_retries);
70 return Ok(value);
71 }
72 Err(e) => {
73 let is_retryable = match &e {
74 IndexerError::ApiError(_) => {
75 warn!("API Error: {}", e);
76 true
77 }
78 IndexerError::PhotonError {
79 context: _,
80 message: _,
81 } => {
82 warn!("Operation failed, checking if retryable...");
83 true
84 }
85 IndexerError::IndexerNotSyncedToSlot => true,
86 IndexerError::Base58DecodeError { .. } => false,
87 IndexerError::AccountNotFound => false,
88 IndexerError::InvalidParameters(_) => false,
89 IndexerError::NotImplemented(_) => false,
90 _ => false,
91 };
92
93 if is_retryable && attempts < max_retries {
94 warn!(
95 "Attempt {}/{}: Operation failed. Retrying",
96 attempts, max_retries
97 );
98
99 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
100 delay_ms = std::cmp::min(delay_ms * 2, max_delay_ms);
101 } else {
102 if is_retryable {
103 error!("Operation failed after max retries.");
104 } else {
105 error!("Operation failed with non-retryable error.");
106 }
107 return Err(e);
108 }
109 }
110 }
111 }
112 }
113}
114
115impl PhotonIndexer {
116 pub fn new(path: String, api_key: Option<String>) -> Self {
117 let configuration = Configuration {
118 base_path: path,
119 api_key: api_key.map(|key| ApiKey {
120 prefix: Some("api-key".to_string()),
121 key,
122 }),
123 ..Default::default()
124 };
125
126 PhotonIndexer { configuration }
127 }
128
129 pub fn new_with_config(configuration: Configuration) -> Self {
130 PhotonIndexer { configuration }
131 }
132
133 fn extract_result<T>(context: &str, result: Option<T>) -> Result<T, IndexerError> {
134 result.ok_or_else(|| IndexerError::missing_result(context, "value not present"))
135 }
136
137 fn extract_result_with_error_check<T>(
138 context: &str,
139 error: Option<Box<photon_api::models::GetBatchAddressUpdateInfoPost200ResponseError>>,
140 result: Option<T>,
141 ) -> Result<T, IndexerError> {
142 if let Some(error) = error {
143 let error_message = error
144 .clone()
145 .message
146 .unwrap_or_else(|| format!("Unknown API error: {:?}", error).to_string());
147 return Err(IndexerError::ApiError(format!(
148 "API error in {} (code: {:?}): {}",
149 context, error.code, error_message
150 )));
151 }
152
153 Self::extract_result(context, result)
154 }
155
156 fn build_account_params(
157 &self,
158 address: Option<Address>,
159 hash: Option<Hash>,
160 ) -> Result<photon_api::models::GetCompressedAccountPostRequestParams, IndexerError> {
161 match (address, hash) {
162 (None, None) => Err(IndexerError::InvalidParameters(
163 "Either address or hash must be provided".to_string(),
164 )),
165 (Some(_), Some(_)) => Err(IndexerError::InvalidParameters(
166 "Only one of address or hash must be provided".to_string(),
167 )),
168 (address, hash) => Ok(photon_api::models::GetCompressedAccountPostRequestParams {
169 address: address.map(|x| x.to_base58()),
170 hash: hash.map(|x| x.to_base58()),
171 }),
172 }
173 }
174}
175
176impl Debug for PhotonIndexer {
177 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
178 f.debug_struct("PhotonIndexer")
179 .field("configuration", &self.configuration)
180 .finish()
181 }
182}
183
184#[async_trait]
185impl Indexer for PhotonIndexer {
186 async fn get_compressed_account(
187 &self,
188 address: Address,
189 config: Option<IndexerRpcConfig>,
190 ) -> Result<Response<Option<CompressedAccount>>, IndexerError> {
191 let config = config.unwrap_or_default();
192 self.retry(config.retry_config, || async {
193 let params = self.build_account_params(Some(address), None)?;
194 let request = photon_api::models::GetCompressedAccountPostRequest {
195 params: Box::new(params),
196 ..Default::default()
197 };
198
199 let result = photon_api::apis::default_api::get_compressed_account_post(
200 &self.configuration,
201 request,
202 )
203 .await?;
204 let api_response = Self::extract_result_with_error_check(
205 "get_compressed_account",
206 result.error,
207 result.result.map(|r| *r),
208 )?;
209 if api_response.context.slot < config.slot {
210 return Err(IndexerError::IndexerNotSyncedToSlot);
211 }
212 let account = match api_response.value {
213 Some(boxed) => Some(CompressedAccount::try_from(&*boxed)?),
214 None => None,
215 };
216
217 Ok(Response {
218 context: Context {
219 slot: api_response.context.slot,
220 },
221 value: account,
222 })
223 })
224 .await
225 }
226
227 async fn get_compressed_account_by_hash(
228 &self,
229 hash: Hash,
230 config: Option<IndexerRpcConfig>,
231 ) -> Result<Response<Option<CompressedAccount>>, IndexerError> {
232 let config = config.unwrap_or_default();
233 self.retry(config.retry_config, || async {
234 let params = self.build_account_params(None, Some(hash))?;
235 let request = photon_api::models::GetCompressedAccountPostRequest {
236 params: Box::new(params),
237 ..Default::default()
238 };
239
240 let result = photon_api::apis::default_api::get_compressed_account_post(
241 &self.configuration,
242 request,
243 )
244 .await?;
245 let api_response = Self::extract_result_with_error_check(
246 "get_compressed_account_by_hash",
247 result.error,
248 result.result.map(|r| *r),
249 )?;
250 if api_response.context.slot < config.slot {
251 return Err(IndexerError::IndexerNotSyncedToSlot);
252 }
253 let account = match api_response.value {
254 Some(boxed) => Some(CompressedAccount::try_from(&*boxed)?),
255 None => None,
256 };
257
258 Ok(Response {
259 context: Context {
260 slot: api_response.context.slot,
261 },
262 value: account,
263 })
264 })
265 .await
266 }
267
268 async fn get_compressed_accounts_by_owner(
269 &self,
270 owner: &Pubkey,
271 options: Option<GetCompressedAccountsByOwnerConfig>,
272 config: Option<IndexerRpcConfig>,
273 ) -> Result<Response<ItemsWithCursor<CompressedAccount>>, IndexerError> {
274 let config = config.unwrap_or_default();
275 self.retry(config.retry_config, || async {
276 #[cfg(feature = "v2")]
277 {
278 let request = photon_api::models::GetCompressedAccountsByOwnerV2PostRequest {
279 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
280 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
281 data_slice: options.as_ref().and_then(|o| {
282 o.data_slice.as_ref().map(|ds| {
283 Box::new(photon_api::models::DataSlice {
284 length: ds.length as u32,
285 offset: ds.offset as u32,
286 })
287 })
288 }),
289 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
290 limit: options.as_ref().and_then(|o| o.limit),
291 owner: owner.to_string(),
292 }),
293 ..Default::default()
294 };
295 let result =
296 photon_api::apis::default_api::get_compressed_accounts_by_owner_v2_post(
297 &self.configuration,
298 request,
299 )
300 .await?;
301 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
302 if response.context.slot < config.slot {
303 return Err(IndexerError::IndexerNotSyncedToSlot);
304 }
305 let accounts: Result<Vec<_>, _> = response
306 .value
307 .items
308 .iter()
309 .map(CompressedAccount::try_from)
310 .collect();
311
312 let cursor = response.value.cursor;
313
314 Ok(Response {
315 context: Context {
316 slot: response.context.slot,
317 },
318 value: ItemsWithCursor {
319 items: accounts?,
320 cursor,
321 },
322 })
323 }
324 #[cfg(not(feature = "v2"))]
325 {
326 let request = photon_api::models::GetCompressedAccountsByOwnerPostRequest {
327 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
328 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
329 data_slice: options.as_ref().and_then(|o| {
330 o.data_slice.as_ref().map(|ds| {
331 Box::new(photon_api::models::DataSlice {
332 length: ds.length as u32,
333 offset: ds.offset as u32,
334 })
335 })
336 }),
337 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
338 limit: options.as_ref().and_then(|o| o.limit),
339 owner: owner.to_string(),
340 }),
341 ..Default::default()
342 };
343 let result = photon_api::apis::default_api::get_compressed_accounts_by_owner_post(
344 &self.configuration,
345 request,
346 )
347 .await?;
348 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
349 if response.context.slot < config.slot {
350 return Err(IndexerError::IndexerNotSyncedToSlot);
351 }
352 let accounts: Result<Vec<_>, _> = response
353 .value
354 .items
355 .iter()
356 .map(CompressedAccount::try_from)
357 .collect();
358
359 let cursor = response.value.cursor;
360
361 Ok(Response {
362 context: Context {
363 slot: response.context.slot,
364 },
365 value: ItemsWithCursor {
366 items: accounts?,
367 cursor,
368 },
369 })
370 }
371 })
372 .await
373 }
374
375 async fn get_compressed_balance(
376 &self,
377 address: Option<Address>,
378 hash: Option<Hash>,
379 config: Option<IndexerRpcConfig>,
380 ) -> Result<Response<u64>, IndexerError> {
381 let config = config.unwrap_or_default();
382 self.retry(config.retry_config, || async {
383 let params = self.build_account_params(address, hash)?;
384 let request = photon_api::models::GetCompressedAccountBalancePostRequest {
385 params: Box::new(params),
386 ..Default::default()
387 };
388
389 let result = photon_api::apis::default_api::get_compressed_account_balance_post(
390 &self.configuration,
391 request,
392 )
393 .await?;
394
395 let api_response = Self::extract_result_with_error_check(
396 "get_compressed_account_balance",
397 result.error,
398 result.result.map(|r| *r),
399 )?;
400 if api_response.context.slot < config.slot {
401 return Err(IndexerError::IndexerNotSyncedToSlot);
402 }
403 Ok(Response {
404 context: Context {
405 slot: api_response.context.slot,
406 },
407 value: api_response.value,
408 })
409 })
410 .await
411 }
412
413 async fn get_compressed_balance_by_owner(
414 &self,
415 owner: &Pubkey,
416 config: Option<IndexerRpcConfig>,
417 ) -> Result<Response<u64>, IndexerError> {
418 let config = config.unwrap_or_default();
419 self.retry(config.retry_config, || async {
420 let request = photon_api::models::GetCompressedBalanceByOwnerPostRequest {
421 params: Box::new(
422 photon_api::models::GetCompressedBalanceByOwnerPostRequestParams {
423 owner: owner.to_string(),
424 },
425 ),
426 ..Default::default()
427 };
428
429 let result = photon_api::apis::default_api::get_compressed_balance_by_owner_post(
430 &self.configuration,
431 request,
432 )
433 .await?;
434
435 let api_response = Self::extract_result_with_error_check(
436 "get_compressed_balance_by_owner",
437 result.error,
438 result.result.map(|r| *r),
439 )?;
440 if api_response.context.slot < config.slot {
441 return Err(IndexerError::IndexerNotSyncedToSlot);
442 }
443 Ok(Response {
444 context: Context {
445 slot: api_response.context.slot,
446 },
447 value: api_response.value,
448 })
449 })
450 .await
451 }
452
453 async fn get_compressed_mint_token_holders(
454 &self,
455 mint: &Pubkey,
456 options: Option<PaginatedOptions>,
457 config: Option<IndexerRpcConfig>,
458 ) -> Result<Response<ItemsWithCursor<OwnerBalance>>, IndexerError> {
459 let config = config.unwrap_or_default();
460 self.retry(config.retry_config, || async {
461 let request = photon_api::models::GetCompressedMintTokenHoldersPostRequest {
462 params: Box::new(
463 photon_api::models::GetCompressedMintTokenHoldersPostRequestParams {
464 mint: mint.to_string(),
465 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
466 limit: options.as_ref().and_then(|o| o.limit),
467 },
468 ),
469 ..Default::default()
470 };
471
472 let result = photon_api::apis::default_api::get_compressed_mint_token_holders_post(
473 &self.configuration,
474 request,
475 )
476 .await?;
477
478 let api_response = Self::extract_result_with_error_check(
479 "get_compressed_mint_token_holders",
480 result.error,
481 result.result.map(|r| *r),
482 )?;
483 if api_response.context.slot < config.slot {
484 return Err(IndexerError::IndexerNotSyncedToSlot);
485 }
486 let owner_balances: Result<Vec<_>, _> = api_response
487 .value
488 .items
489 .iter()
490 .map(OwnerBalance::try_from)
491 .collect();
492
493 let cursor = api_response.value.cursor;
494
495 Ok(Response {
496 context: Context {
497 slot: api_response.context.slot,
498 },
499 value: ItemsWithCursor {
500 items: owner_balances?,
501 cursor,
502 },
503 })
504 })
505 .await
506 }
507
508 async fn get_compressed_token_account_balance(
509 &self,
510 address: Option<Address>,
511 hash: Option<Hash>,
512 config: Option<IndexerRpcConfig>,
513 ) -> Result<Response<u64>, IndexerError> {
514 let config = config.unwrap_or_default();
515 self.retry(config.retry_config, || async {
516 let request = photon_api::models::GetCompressedTokenAccountBalancePostRequest {
517 params: Box::new(photon_api::models::GetCompressedAccountPostRequestParams {
518 address: address.map(|x| x.to_base58()),
519 hash: hash.map(|x| x.to_base58()),
520 }),
521 ..Default::default()
522 };
523
524 let result = photon_api::apis::default_api::get_compressed_token_account_balance_post(
525 &self.configuration,
526 request,
527 )
528 .await?;
529
530 let api_response = Self::extract_result_with_error_check(
531 "get_compressed_token_account_balance",
532 result.error,
533 result.result.map(|r| *r),
534 )?;
535 if api_response.context.slot < config.slot {
536 return Err(IndexerError::IndexerNotSyncedToSlot);
537 }
538 Ok(Response {
539 context: Context {
540 slot: api_response.context.slot,
541 },
542 value: api_response.value.amount,
543 })
544 })
545 .await
546 }
547
548 async fn get_compressed_token_accounts_by_delegate(
549 &self,
550 delegate: &Pubkey,
551 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
552 config: Option<IndexerRpcConfig>,
553 ) -> Result<Response<ItemsWithCursor<CompressedTokenAccount>>, IndexerError> {
554 let config = config.unwrap_or_default();
555 self.retry(config.retry_config, || async {
556 #[cfg(feature = "v2")]
557 {
558 let request = photon_api::models::GetCompressedTokenAccountsByDelegateV2PostRequest {
559 params: Box::new(
560 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
561 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
562 limit: options.as_ref().and_then(|o| o.limit),
563 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
564 delegate: delegate.to_string(),
565 },
566 ),
567 ..Default::default()
568 };
569
570 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_v2_post(
571 &self.configuration,
572 request,
573 )
574 .await?;
575
576 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
577 if response.context.slot < config.slot {
578 return Err(IndexerError::IndexerNotSyncedToSlot);
579 }
580
581 let token_accounts: Result<Vec<_>, _> = response
582 .value
583 .items
584 .iter()
585 .map(CompressedTokenAccount::try_from)
586 .collect();
587
588 let cursor = response.value.cursor;
589
590 Ok(Response {
591 context: Context {
592 slot: response.context.slot,
593 },
594 value: ItemsWithCursor {
595 items: token_accounts?,
596 cursor,
597 },
598 })
599 }
600 #[cfg(not(feature = "v2"))]
601 {
602 let request = photon_api::models::GetCompressedTokenAccountsByDelegatePostRequest {
603 params: Box::new(
604 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
605 delegate: delegate.to_string(),
606 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
607 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
608 limit: options.as_ref().and_then(|o| o.limit),
609 },
610 ),
611 ..Default::default()
612 };
613
614 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_post(
615 &self.configuration,
616 request,
617 )
618 .await?;
619
620 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
621 if response.context.slot < config.slot {
622 return Err(IndexerError::IndexerNotSyncedToSlot);
623 }
624
625 let token_accounts: Result<Vec<_>, _> = response
626 .value
627 .items
628 .iter()
629 .map(CompressedTokenAccount::try_from)
630 .collect();
631
632 let cursor = response.value.cursor;
633
634 Ok(Response {
635 context: Context {
636 slot: response.context.slot,
637 },
638 value: ItemsWithCursor {
639 items: token_accounts?,
640 cursor,
641 },
642 })
643 }
644 })
645 .await
646 }
647
648 async fn get_compressed_token_accounts_by_owner(
649 &self,
650 owner: &Pubkey,
651 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
652 config: Option<IndexerRpcConfig>,
653 ) -> Result<Response<ItemsWithCursor<CompressedTokenAccount>>, IndexerError> {
654 let config = config.unwrap_or_default();
655 self.retry(config.retry_config, || async {
656 #[cfg(feature = "v2")]
657 {
658 let request = photon_api::models::GetCompressedTokenAccountsByOwnerV2PostRequest {
659 params: Box::from(
660 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
661 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
662 limit: options.as_ref().and_then(|o| o.limit),
663 mint: options
664 .as_ref()
665 .and_then(|o| o.mint.as_ref())
666 .map(|x| x.to_string()),
667 owner: owner.to_string(),
668 },
669 ),
670 ..Default::default()
671 };
672 let result =
673 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_v2_post(
674 &self.configuration,
675 request,
676 )
677 .await?;
678 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
679 if response.context.slot < config.slot {
680 return Err(IndexerError::IndexerNotSyncedToSlot);
681 }
682 let token_accounts: Result<Vec<_>, _> = response
683 .value
684 .items
685 .iter()
686 .map(CompressedTokenAccount::try_from)
687 .collect();
688
689 let cursor = response.value.cursor;
690
691 Ok(Response {
692 context: Context {
693 slot: response.context.slot,
694 },
695 value: ItemsWithCursor {
696 items: token_accounts?,
697 cursor,
698 },
699 })
700 }
701 #[cfg(not(feature = "v2"))]
702 {
703 let request = photon_api::models::GetCompressedTokenAccountsByOwnerPostRequest {
704 params: Box::new(
705 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
706 owner: owner.to_string(),
707 mint: options
708 .as_ref()
709 .and_then(|o| o.mint.as_ref())
710 .map(|x| x.to_string()),
711 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
712 limit: options.as_ref().and_then(|o| o.limit),
713 },
714 ),
715 ..Default::default()
716 };
717
718 let result =
719 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_post(
720 &self.configuration,
721 request,
722 )
723 .await?;
724
725 let response = Self::extract_result_with_error_check(
726 "get_compressed_token_accounts_by_owner",
727 result.error,
728 result.result.map(|r| *r),
729 )?;
730 if response.context.slot < config.slot {
731 return Err(IndexerError::IndexerNotSyncedToSlot);
732 }
733 let token_accounts: Result<Vec<_>, _> = response
734 .value
735 .items
736 .iter()
737 .map(CompressedTokenAccount::try_from)
738 .collect();
739
740 let cursor = response.value.cursor;
741
742 Ok(Response {
743 context: Context {
744 slot: response.context.slot,
745 },
746 value: ItemsWithCursor {
747 items: token_accounts?,
748 cursor,
749 },
750 })
751 }
752 })
753 .await
754 }
755
756 async fn get_compressed_token_balances_by_owner_v2(
757 &self,
758 owner: &Pubkey,
759 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
760 config: Option<IndexerRpcConfig>,
761 ) -> Result<Response<ItemsWithCursor<TokenBalance>>, IndexerError> {
762 let config = config.unwrap_or_default();
763 self.retry(config.retry_config, || async {
764 #[cfg(feature = "v2")]
765 {
766 let request = photon_api::models::GetCompressedTokenBalancesByOwnerV2PostRequest {
767 params: Box::new(
768 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
769 owner: owner.to_string(),
770 mint: options
771 .as_ref()
772 .and_then(|o| o.mint.as_ref())
773 .map(|x| x.to_string()),
774 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
775 limit: options.as_ref().and_then(|o| o.limit),
776 },
777 ),
778 ..Default::default()
779 };
780
781 let result =
782 photon_api::apis::default_api::get_compressed_token_balances_by_owner_v2_post(
783 &self.configuration,
784 request,
785 )
786 .await?;
787
788 let api_response = Self::extract_result_with_error_check(
789 "get_compressed_token_balances_by_owner_v2",
790 result.error,
791 result.result.map(|r| *r),
792 )?;
793 if api_response.context.slot < config.slot {
794 return Err(IndexerError::IndexerNotSyncedToSlot);
795 }
796
797 let token_balances: Result<Vec<_>, _> = api_response
798 .value
799 .items
800 .iter()
801 .map(TokenBalance::try_from)
802 .collect();
803
804 Ok(Response {
805 context: Context {
806 slot: api_response.context.slot,
807 },
808 value: ItemsWithCursor {
809 items: token_balances?,
810 cursor: api_response.value.cursor,
811 },
812 })
813 }
814 #[cfg(not(feature = "v2"))]
815 {
816 let request = photon_api::models::GetCompressedTokenBalancesByOwnerPostRequest {
817 params: Box::new(
818 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
819 owner: owner.to_string(),
820 mint: options
821 .as_ref()
822 .and_then(|o| o.mint.as_ref())
823 .map(|x| x.to_string()),
824 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
825 limit: options.as_ref().and_then(|o| o.limit),
826 },
827 ),
828 ..Default::default()
829 };
830
831 let result =
832 photon_api::apis::default_api::get_compressed_token_balances_by_owner_post(
833 &self.configuration,
834 request,
835 )
836 .await?;
837
838 let api_response = Self::extract_result_with_error_check(
839 "get_compressed_token_balances_by_owner",
840 result.error,
841 result.result.map(|r| *r),
842 )?;
843 if api_response.context.slot < config.slot {
844 return Err(IndexerError::IndexerNotSyncedToSlot);
845 }
846
847 let token_balances: Result<Vec<_>, _> = api_response
848 .value
849 .token_balances
850 .iter()
851 .map(TokenBalance::try_from)
852 .collect();
853
854 Ok(Response {
855 context: Context {
856 slot: api_response.context.slot,
857 },
858 value: ItemsWithCursor {
859 items: token_balances?,
860 cursor: api_response.value.cursor,
861 },
862 })
863 }
864 })
865 .await
866 }
867
868 async fn get_compression_signatures_for_account(
869 &self,
870 hash: Hash,
871 config: Option<IndexerRpcConfig>,
872 ) -> Result<Response<Items<SignatureWithMetadata>>, IndexerError> {
873 let config = config.unwrap_or_default();
874 self.retry(config.retry_config, || async {
875 let request = photon_api::models::GetCompressionSignaturesForAccountPostRequest {
876 params: Box::new(
877 photon_api::models::GetCompressedAccountProofPostRequestParams {
878 hash: hash.to_base58(),
879 },
880 ),
881 ..Default::default()
882 };
883
884 let result =
885 photon_api::apis::default_api::get_compression_signatures_for_account_post(
886 &self.configuration,
887 request,
888 )
889 .await?;
890
891 let api_response = Self::extract_result_with_error_check(
892 "get_compression_signatures_for_account",
893 result.error,
894 result.result.map(|r| *r),
895 )?;
896 if api_response.context.slot < config.slot {
897 return Err(IndexerError::IndexerNotSyncedToSlot);
898 }
899 let signatures = api_response
900 .value
901 .items
902 .iter()
903 .map(SignatureWithMetadata::try_from)
904 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
905
906 Ok(Response {
907 context: Context {
908 slot: api_response.context.slot,
909 },
910 value: Items { items: signatures },
911 })
912 })
913 .await
914 }
915
916 async fn get_compression_signatures_for_address(
917 &self,
918 address: &[u8; 32],
919 options: Option<PaginatedOptions>,
920 config: Option<IndexerRpcConfig>,
921 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
922 let config = config.unwrap_or_default();
923 self.retry(config.retry_config, || async {
924 let request = photon_api::models::GetCompressionSignaturesForAddressPostRequest {
925 params: Box::new(
926 photon_api::models::GetCompressionSignaturesForAddressPostRequestParams {
927 address: address.to_base58(),
928 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
929 limit: options.as_ref().and_then(|o| o.limit),
930 },
931 ),
932 ..Default::default()
933 };
934
935 let result =
936 photon_api::apis::default_api::get_compression_signatures_for_address_post(
937 &self.configuration,
938 request,
939 )
940 .await?;
941
942 let api_response = Self::extract_result_with_error_check(
943 "get_compression_signatures_for_address",
944 result.error,
945 result.result.map(|r| *r),
946 )?;
947 if api_response.context.slot < config.slot {
948 return Err(IndexerError::IndexerNotSyncedToSlot);
949 }
950
951 let signatures = api_response
952 .value
953 .items
954 .iter()
955 .map(SignatureWithMetadata::try_from)
956 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
957
958 let cursor = api_response.value.cursor;
959
960 Ok(Response {
961 context: Context {
962 slot: api_response.context.slot,
963 },
964 value: ItemsWithCursor {
965 items: signatures,
966 cursor,
967 },
968 })
969 })
970 .await
971 }
972
973 async fn get_compression_signatures_for_owner(
974 &self,
975 owner: &Pubkey,
976 options: Option<PaginatedOptions>,
977 config: Option<IndexerRpcConfig>,
978 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
979 let config = config.unwrap_or_default();
980 self.retry(config.retry_config, || async {
981 let request = photon_api::models::GetCompressionSignaturesForOwnerPostRequest {
982 params: Box::new(
983 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
984 owner: owner.to_string(),
985 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
986 limit: options.as_ref().and_then(|o| o.limit),
987 },
988 ),
989 ..Default::default()
990 };
991
992 let result = photon_api::apis::default_api::get_compression_signatures_for_owner_post(
993 &self.configuration,
994 request,
995 )
996 .await?;
997
998 let api_response = Self::extract_result_with_error_check(
999 "get_compression_signatures_for_owner",
1000 result.error,
1001 result.result.map(|r| *r),
1002 )?;
1003 if api_response.context.slot < config.slot {
1004 return Err(IndexerError::IndexerNotSyncedToSlot);
1005 }
1006
1007 let signatures = api_response
1008 .value
1009 .items
1010 .iter()
1011 .map(SignatureWithMetadata::try_from)
1012 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1013
1014 let cursor = api_response.value.cursor;
1015
1016 Ok(Response {
1017 context: Context {
1018 slot: api_response.context.slot,
1019 },
1020 value: ItemsWithCursor {
1021 items: signatures,
1022 cursor,
1023 },
1024 })
1025 })
1026 .await
1027 }
1028
1029 async fn get_compression_signatures_for_token_owner(
1030 &self,
1031 owner: &Pubkey,
1032 options: Option<PaginatedOptions>,
1033 config: Option<IndexerRpcConfig>,
1034 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
1035 let config = config.unwrap_or_default();
1036 self.retry(config.retry_config, || async {
1037 let request = photon_api::models::GetCompressionSignaturesForTokenOwnerPostRequest {
1038 params: Box::new(
1039 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
1040 owner: owner.to_string(),
1041 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
1042 limit: options.as_ref().and_then(|o| o.limit),
1043 },
1044 ),
1045 ..Default::default()
1046 };
1047
1048 let result =
1049 photon_api::apis::default_api::get_compression_signatures_for_token_owner_post(
1050 &self.configuration,
1051 request,
1052 )
1053 .await?;
1054
1055 let api_response = Self::extract_result_with_error_check(
1056 "get_compression_signatures_for_token_owner",
1057 result.error,
1058 result.result.map(|r| *r),
1059 )?;
1060 if api_response.context.slot < config.slot {
1061 return Err(IndexerError::IndexerNotSyncedToSlot);
1062 }
1063
1064 let signatures = api_response
1065 .value
1066 .items
1067 .iter()
1068 .map(SignatureWithMetadata::try_from)
1069 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1070
1071 let cursor = api_response.value.cursor;
1072
1073 Ok(Response {
1074 context: Context {
1075 slot: api_response.context.slot,
1076 },
1077 value: ItemsWithCursor {
1078 items: signatures,
1079 cursor,
1080 },
1081 })
1082 })
1083 .await
1084 }
1085
1086 async fn get_indexer_health(&self, config: Option<RetryConfig>) -> Result<bool, IndexerError> {
1087 let config = config.unwrap_or_default();
1088 self.retry(config, || async {
1089 let request = photon_api::models::GetIndexerHealthPostRequest {
1090 ..Default::default()
1091 };
1092
1093 let result = photon_api::apis::default_api::get_indexer_health_post(
1094 &self.configuration,
1095 request,
1096 )
1097 .await?;
1098
1099 let _api_response = Self::extract_result_with_error_check(
1100 "get_indexer_health",
1101 result.error,
1102 result.result,
1103 )?;
1104
1105 Ok(true)
1106 })
1107 .await
1108 }
1109
1110 async fn get_indexer_slot(&self, config: Option<RetryConfig>) -> Result<u64, IndexerError> {
1111 let config = config.unwrap_or_default();
1112 self.retry(config, || async {
1113 let request = photon_api::models::GetIndexerSlotPostRequest {
1114 ..Default::default()
1115 };
1116
1117 let result =
1118 photon_api::apis::default_api::get_indexer_slot_post(&self.configuration, request)
1119 .await?;
1120
1121 let result = Self::extract_result_with_error_check(
1122 "get_indexer_slot",
1123 result.error,
1124 result.result,
1125 )?;
1126 Ok(result)
1127 })
1128 .await
1129 }
1130
1131 async fn get_multiple_compressed_account_proofs(
1132 &self,
1133 hashes: Vec<[u8; 32]>,
1134 config: Option<IndexerRpcConfig>,
1135 ) -> Result<Response<Items<MerkleProof>>, IndexerError> {
1136 let config = config.unwrap_or_default();
1137 self.retry(config.retry_config, || async {
1138 let hashes_for_async = hashes.clone();
1139
1140 let request: photon_api::models::GetMultipleCompressedAccountProofsPostRequest =
1141 photon_api::models::GetMultipleCompressedAccountProofsPostRequest {
1142 params: hashes_for_async
1143 .into_iter()
1144 .map(|hash| bs58::encode(hash).into_string())
1145 .collect(),
1146 ..Default::default()
1147 };
1148
1149 let result =
1150 photon_api::apis::default_api::get_multiple_compressed_account_proofs_post(
1151 &self.configuration,
1152 request,
1153 )
1154 .await?;
1155
1156 if let Some(error) = &result.error {
1157 let error_msg = error.message.as_deref().unwrap_or("Unknown error");
1158 let error_code = error.code.unwrap_or(0);
1159 tracing::error!("API returned error: {}", error_msg);
1160 return Err(IndexerError::PhotonError {
1161 context: "get_multiple_compressed_account_proofs".to_string(),
1162 message: format!("API Error (code {}): {}", error_code, error_msg),
1163 });
1164 }
1165
1166 let photon_proofs = result.result.ok_or_else(|| {
1167 IndexerError::missing_result(
1168 "get_multiple_new_address_proofs",
1169 "No result returned from Photon API",
1170 )
1171 })?;
1172 if photon_proofs.context.slot < config.slot {
1173 return Err(IndexerError::IndexerNotSyncedToSlot);
1174 }
1175
1176 let proofs = photon_proofs
1177 .value
1178 .iter()
1179 .map(|x| {
1180 let mut proof_vec = x.proof.clone();
1181 proof_vec.truncate(proof_vec.len() - 10); let proof = proof_vec
1184 .iter()
1185 .map(|x| Hash::from_base58(x))
1186 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()
1187 .map_err(|e| IndexerError::Base58DecodeError {
1188 field: "proof".to_string(),
1189 message: e.to_string(),
1190 })?;
1191
1192 Ok(MerkleProof {
1193 hash: <[u8; 32] as Base58Conversions>::from_base58(&x.hash)?,
1194 leaf_index: x.leaf_index,
1195 merkle_tree: Pubkey::from_str_const(x.merkle_tree.as_str()),
1196 proof,
1197 root_seq: x.root_seq,
1198 root: <[u8; 32] as Base58Conversions>::from_base58(&x.root)?,
1199 })
1200 })
1201 .collect::<Result<Vec<MerkleProof>, IndexerError>>()?;
1202
1203 Ok(Response {
1204 context: Context {
1205 slot: photon_proofs.context.slot,
1206 },
1207 value: Items { items: proofs },
1208 })
1209 })
1210 .await
1211 }
1212
1213 async fn get_multiple_compressed_accounts(
1214 &self,
1215 addresses: Option<Vec<Address>>,
1216 hashes: Option<Vec<Hash>>,
1217 config: Option<IndexerRpcConfig>,
1218 ) -> Result<Response<Items<Option<CompressedAccount>>>, IndexerError> {
1219 let config = config.unwrap_or_default();
1220 self.retry(config.retry_config, || async {
1221 let hashes = hashes.clone();
1222 let addresses = addresses.clone();
1223 let request = photon_api::models::GetMultipleCompressedAccountsPostRequest {
1224 params: Box::new(
1225 photon_api::models::GetMultipleCompressedAccountsPostRequestParams {
1226 addresses: addresses.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1227 hashes: hashes.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1228 },
1229 ),
1230 ..Default::default()
1231 };
1232
1233 let result = photon_api::apis::default_api::get_multiple_compressed_accounts_post(
1234 &self.configuration,
1235 request,
1236 )
1237 .await?;
1238
1239 let api_response = Self::extract_result_with_error_check(
1240 "get_multiple_compressed_accounts",
1241 result.error,
1242 result.result.map(|r| *r),
1243 )?;
1244 if api_response.context.slot < config.slot {
1245 return Err(IndexerError::IndexerNotSyncedToSlot);
1246 }
1247 let accounts = api_response
1248 .value
1249 .items
1250 .iter()
1251 .map(|account_opt| match account_opt {
1252 Some(account) => CompressedAccount::try_from(account).map(Some),
1253 None => Ok(None),
1254 })
1255 .collect::<Result<Vec<Option<CompressedAccount>>, IndexerError>>()?;
1256
1257 Ok(Response {
1258 context: Context {
1259 slot: api_response.context.slot,
1260 },
1261 value: Items { items: accounts },
1262 })
1263 })
1264 .await
1265 }
1266
1267 async fn get_multiple_new_address_proofs(
1268 &self,
1269 merkle_tree_pubkey: [u8; 32],
1270 addresses: Vec<[u8; 32]>,
1271 config: Option<IndexerRpcConfig>,
1272 ) -> Result<Response<Items<NewAddressProofWithContext>>, IndexerError> {
1273 let config = config.unwrap_or_default();
1274 self.retry(config.retry_config, || async {
1275 let params: Vec<photon_api::models::address_with_tree::AddressWithTree> = addresses
1276 .iter()
1277 .map(|x| photon_api::models::address_with_tree::AddressWithTree {
1278 address: bs58::encode(x).into_string(),
1279 tree: bs58::encode(&merkle_tree_pubkey).into_string(),
1280 })
1281 .collect();
1282
1283 let request = photon_api::models::GetMultipleNewAddressProofsV2PostRequest {
1284 params,
1285 ..Default::default()
1286 };
1287
1288 let result = photon_api::apis::default_api::get_multiple_new_address_proofs_v2_post(
1289 &self.configuration,
1290 request,
1291 )
1292 .await;
1293
1294 let result = result?;
1295
1296 let api_response = match Self::extract_result_with_error_check(
1297 "get_multiple_new_address_proofs",
1298 result.error,
1299 result.result.map(|r| *r),
1300 ) {
1301 Ok(proofs) => proofs,
1302 Err(e) => {
1303 error!("Failed to extract proofs: {:?}", e);
1304 return Err(e);
1305 }
1306 };
1307 if api_response.context.slot < config.slot {
1308 return Err(IndexerError::IndexerNotSyncedToSlot);
1309 }
1310 let photon_proofs = api_response.value;
1311 let mut proofs = Vec::new();
1312 for photon_proof in photon_proofs {
1313 let tree_pubkey = Hash::from_base58(&photon_proof.merkle_tree).map_err(|e| {
1314 IndexerError::Base58DecodeError {
1315 field: "merkle_tree".to_string(),
1316 message: e.to_string(),
1317 }
1318 })?;
1319
1320 let low_address_value = Hash::from_base58(&photon_proof.lower_range_address)
1321 .map_err(|e| IndexerError::Base58DecodeError {
1322 field: "lower_range_address".to_string(),
1323 message: e.to_string(),
1324 })?;
1325
1326 let next_address_value = Hash::from_base58(&photon_proof.higher_range_address)
1327 .map_err(|e| IndexerError::Base58DecodeError {
1328 field: "higher_range_address".to_string(),
1329 message: e.to_string(),
1330 })?;
1331
1332 let mut proof_vec: Vec<[u8; 32]> = photon_proof
1333 .proof
1334 .iter()
1335 .map(|x: &String| Hash::from_base58(x))
1336 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()?;
1337
1338 proof_vec.truncate(proof_vec.len() - 10); let mut proof_arr = [[0u8; 32]; 16];
1340 proof_arr.copy_from_slice(&proof_vec);
1341
1342 let root = Hash::from_base58(&photon_proof.root).map_err(|e| {
1343 IndexerError::Base58DecodeError {
1344 field: "root".to_string(),
1345 message: e.to_string(),
1346 }
1347 })?;
1348
1349 let proof = NewAddressProofWithContext {
1350 merkle_tree: tree_pubkey.into(),
1351 low_address_index: photon_proof.low_element_leaf_index,
1352 low_address_value,
1353 low_address_next_index: photon_proof.next_index,
1354 low_address_next_value: next_address_value,
1355 low_address_proof: proof_arr.to_vec(),
1356 root,
1357 root_seq: photon_proof.root_seq,
1358 new_low_element: None,
1359 new_element: None,
1360 new_element_next_value: None,
1361 };
1362 proofs.push(proof);
1363 }
1364
1365 Ok(Response {
1366 context: Context {
1367 slot: api_response.context.slot,
1368 },
1369 value: Items { items: proofs },
1370 })
1371 })
1372 .await
1373 }
1374
1375 async fn get_validity_proof(
1376 &self,
1377 hashes: Vec<Hash>,
1378 new_addresses_with_trees: Vec<AddressWithTree>,
1379 config: Option<IndexerRpcConfig>,
1380 ) -> Result<Response<super::types::ValidityProofWithContext>, IndexerError> {
1381 let config = config.unwrap_or_default();
1382 self.retry(config.retry_config, || async {
1383 #[cfg(feature = "v2")]
1384 {
1385 let request = photon_api::models::GetValidityProofV2PostRequest {
1386 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1387 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1388 new_addresses_with_trees: Some(
1389 new_addresses_with_trees
1390 .iter()
1391 .map(|x| photon_api::models::AddressWithTree {
1392 address: x.address.to_base58(),
1393 tree: x.tree.to_string(),
1394 })
1395 .collect(),
1396 ),
1397 }),
1398 ..Default::default()
1399 };
1400
1401 let result = photon_api::apis::default_api::get_validity_proof_v2_post(
1402 &self.configuration,
1403 request,
1404 )
1405 .await?;
1406
1407 let api_response = Self::extract_result_with_error_check(
1408 "get_validity_proof_v2",
1409 result.error,
1410 result.result.map(|r| *r),
1411 )?;
1412 if api_response.context.slot < config.slot {
1413 return Err(IndexerError::IndexerNotSyncedToSlot);
1414 }
1415 let validity_proof =
1416 super::types::ValidityProofWithContext::from_api_model_v2(*api_response.value)?;
1417
1418 Ok(Response {
1419 context: Context {
1420 slot: api_response.context.slot,
1421 },
1422 value: validity_proof,
1423 })
1424 }
1425 #[cfg(not(feature = "v2"))]
1426 {
1427 let request = photon_api::models::GetValidityProofPostRequest {
1428 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1429 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1430 new_addresses_with_trees: Some(
1431 new_addresses_with_trees
1432 .iter()
1433 .map(|x| photon_api::models::AddressWithTree {
1434 address: x.address.to_base58(),
1435 tree: x.tree.to_string(),
1436 })
1437 .collect(),
1438 ),
1439 }),
1440 ..Default::default()
1441 };
1442
1443 let result = photon_api::apis::default_api::get_validity_proof_post(
1444 &self.configuration,
1445 request,
1446 )
1447 .await?;
1448
1449 let api_response = Self::extract_result_with_error_check(
1450 "get_validity_proof",
1451 result.error,
1452 result.result.map(|r| *r),
1453 )?;
1454 if api_response.context.slot < config.slot {
1455 return Err(IndexerError::IndexerNotSyncedToSlot);
1456 }
1457 let validity_proof = super::types::ValidityProofWithContext::from_api_model(
1458 *api_response.value,
1459 hashes.len(),
1460 )?;
1461
1462 Ok(Response {
1463 context: Context {
1464 slot: api_response.context.slot,
1465 },
1466 value: validity_proof,
1467 })
1468 }
1469 })
1470 .await
1471 }
1472
1473 async fn get_address_queue_with_proofs(
1474 &mut self,
1475 _merkle_tree_pubkey: &Pubkey,
1476 _zkp_batch_size: u16,
1477 _start_offset: Option<u64>,
1478 _config: Option<IndexerRpcConfig>,
1479 ) -> Result<Response<BatchAddressUpdateIndexerResponse>, IndexerError> {
1480 #[cfg(not(feature = "v2"))]
1481 unimplemented!("get_address_queue_with_proofs");
1482 #[cfg(feature = "v2")]
1483 {
1484 let merkle_tree_pubkey = _merkle_tree_pubkey;
1485 let limit = _zkp_batch_size;
1486 let start_queue_index = _start_offset;
1487 let config = _config.unwrap_or_default();
1488 self.retry(config.retry_config, || async {
1489 let merkle_tree = Hash::from_bytes(merkle_tree_pubkey.to_bytes().as_ref())?;
1490 let request = photon_api::models::GetBatchAddressUpdateInfoPostRequest {
1491 params: Box::new(
1492 photon_api::models::GetBatchAddressUpdateInfoPostRequestParams {
1493 limit,
1494 start_queue_index,
1495 tree: merkle_tree.to_base58(),
1496 },
1497 ),
1498 ..Default::default()
1499 };
1500
1501 let result = photon_api::apis::default_api::get_batch_address_update_info_post(
1502 &self.configuration,
1503 request,
1504 )
1505 .await?;
1506
1507 let api_response = Self::extract_result_with_error_check(
1508 "get_batch_address_update_info",
1509 result.error,
1510 result.result.map(|r| *r),
1511 )?;
1512 if api_response.context.slot < config.slot {
1513 return Err(IndexerError::IndexerNotSyncedToSlot);
1514 }
1515
1516 let addresses = api_response
1517 .addresses
1518 .iter()
1519 .map(|x| crate::indexer::AddressQueueIndex {
1520 address: Hash::from_base58(x.address.clone().as_ref()).unwrap(),
1521 queue_index: x.queue_index,
1522 })
1523 .collect();
1524
1525 let mut proofs: Vec<NewAddressProofWithContext> = vec![];
1526 for proof in api_response.non_inclusion_proofs {
1527 let proof = NewAddressProofWithContext {
1528 merkle_tree: *merkle_tree_pubkey,
1529 low_address_index: proof.low_element_leaf_index,
1530 low_address_value: Hash::from_base58(
1531 proof.lower_range_address.clone().as_ref(),
1532 )
1533 .unwrap(),
1534 low_address_next_index: proof.next_index,
1535 low_address_next_value: Hash::from_base58(
1536 proof.higher_range_address.clone().as_ref(),
1537 )
1538 .unwrap(),
1539 low_address_proof: proof
1540 .proof
1541 .iter()
1542 .map(|x| Hash::from_base58(x.clone().as_ref()).unwrap())
1543 .collect(),
1544 root: Hash::from_base58(proof.root.clone().as_ref()).unwrap(),
1545 root_seq: proof.root_seq,
1546
1547 new_low_element: None,
1548 new_element: None,
1549 new_element_next_value: None,
1550 };
1551 proofs.push(proof);
1552 }
1553
1554 let subtrees = api_response
1555 .subtrees
1556 .iter()
1557 .map(|x| {
1558 let mut arr = [0u8; 32];
1559 arr.copy_from_slice(x.as_slice());
1560 arr
1561 })
1562 .collect::<Vec<_>>();
1563
1564 let result = BatchAddressUpdateIndexerResponse {
1565 batch_start_index: api_response.start_index,
1566 addresses,
1567 non_inclusion_proofs: proofs,
1568 subtrees,
1569 };
1570 Ok(Response {
1571 context: Context {
1572 slot: api_response.context.slot,
1573 },
1574 value: result,
1575 })
1576 })
1577 .await
1578 }
1579 }
1580
1581 async fn get_queue_elements(
1582 &mut self,
1583 _pubkey: [u8; 32],
1584 _queue_type: QueueType,
1585 _num_elements: u16,
1586 _start_queue_index: Option<u64>,
1587 _config: Option<IndexerRpcConfig>,
1588 ) -> Result<Response<QueueElementsResult>, IndexerError> {
1589 #[cfg(not(feature = "v2"))]
1590 unimplemented!("get_queue_elements");
1591 #[cfg(feature = "v2")]
1592 {
1593 use super::MerkleProofWithContext;
1594 let pubkey = _pubkey;
1595 let queue_type = _queue_type;
1596 let limit = _num_elements;
1597 let start_queue_index = _start_queue_index;
1598 let config = _config.unwrap_or_default();
1599 self.retry(config.retry_config, || async {
1600 let request: photon_api::models::GetQueueElementsPostRequest =
1601 photon_api::models::GetQueueElementsPostRequest {
1602 params: Box::from(photon_api::models::GetQueueElementsPostRequestParams {
1603 tree: bs58::encode(pubkey).into_string(),
1604 queue_type: queue_type as u16,
1605 limit,
1606 start_queue_index,
1607 }),
1608 ..Default::default()
1609 };
1610
1611 let result = photon_api::apis::default_api::get_queue_elements_post(
1612 &self.configuration,
1613 request,
1614 )
1615 .await;
1616 let result: Result<Response<QueueElementsResult>, IndexerError> = match result {
1617 Ok(api_response) => match api_response.result {
1618 Some(api_result) => {
1619 if api_result.context.slot < config.slot {
1620 return Err(IndexerError::IndexerNotSyncedToSlot);
1621 }
1622 let response = api_result.value;
1623 let proofs: Vec<MerkleProofWithContext> = response
1624 .iter()
1625 .map(|x| {
1626 let proof = x
1627 .proof
1628 .iter()
1629 .map(|x| Hash::from_base58(x).unwrap())
1630 .collect();
1631 let root = Hash::from_base58(&x.root).unwrap();
1632 let leaf = Hash::from_base58(&x.leaf).unwrap();
1633 let merkle_tree = Hash::from_base58(&x.tree).unwrap();
1634 let tx_hash =
1635 x.tx_hash.as_ref().map(|x| Hash::from_base58(x).unwrap());
1636 let account_hash = Hash::from_base58(&x.account_hash).unwrap();
1637
1638 MerkleProofWithContext {
1639 proof,
1640 root,
1641 leaf_index: x.leaf_index,
1642 leaf,
1643 merkle_tree,
1644 root_seq: x.root_seq,
1645 tx_hash,
1646 account_hash,
1647 }
1648 })
1649 .collect();
1650
1651 Ok(Response {
1652 context: Context {
1653 slot: api_result.context.slot,
1654 },
1655 value: QueueElementsResult {
1656 elements: proofs,
1657 first_value_queue_index: Some(
1658 api_result.first_value_queue_index as u64,
1659 ),
1660 },
1661 })
1662 }
1663 None => {
1664 let error =
1665 api_response
1666 .error
1667 .ok_or_else(|| IndexerError::PhotonError {
1668 context: "get_queue_elements".to_string(),
1669 message: "No error details provided".to_string(),
1670 })?;
1671
1672 Err(IndexerError::PhotonError {
1673 context: "get_queue_elements".to_string(),
1674 message: error
1675 .message
1676 .unwrap_or_else(|| "Unknown error".to_string()),
1677 })
1678 }
1679 },
1680 Err(e) => Err(IndexerError::PhotonError {
1681 context: "get_queue_elements".to_string(),
1682 message: e.to_string(),
1683 }),
1684 };
1685
1686 result
1687 })
1688 .await
1689 }
1690 }
1691
1692 async fn get_subtrees(
1693 &self,
1694 _merkle_tree_pubkey: [u8; 32],
1695 _config: Option<IndexerRpcConfig>,
1696 ) -> Result<Response<Items<[u8; 32]>>, IndexerError> {
1697 #[cfg(not(feature = "v2"))]
1698 unimplemented!();
1699 #[cfg(feature = "v2")]
1700 {
1701 todo!();
1702 }
1703 }
1704}