1use std::{fmt::Debug, time::Duration};
2
3use async_trait::async_trait;
4use bs58;
5use photon_api::{
6 apis::configuration::{ApiKey, Configuration},
7 models::GetCompressedAccountsByOwnerPostRequestParams,
8};
9use solana_pubkey::Pubkey;
10use tracing::{error, trace, warn};
11
12use super::types::{
13 CompressedAccount, CompressedTokenAccount, OwnerBalance, SignatureWithMetadata, TokenBalance,
14};
15use crate::indexer::{
16 base58::Base58Conversions,
17 config::RetryConfig,
18 response::{Context, Items, ItemsWithCursor, Response},
19 Address, AddressWithTree, GetCompressedAccountsByOwnerConfig,
20 GetCompressedTokenAccountsByOwnerOrDelegateOptions, Hash, Indexer, IndexerError,
21 IndexerRpcConfig, MerkleProof, NewAddressProofWithContext, PaginatedOptions,
22};
23
24pub struct PhotonIndexer {
26 configuration: Configuration,
27}
28
29impl PhotonIndexer {
30 pub fn default_path() -> String {
31 "http://127.0.0.1:8784".to_string()
32 }
33}
34
35impl PhotonIndexer {
36 async fn retry<F, Fut, T>(
37 &self,
38 config: RetryConfig,
39 mut operation: F,
40 ) -> Result<T, IndexerError>
41 where
42 F: FnMut() -> Fut,
43 Fut: std::future::Future<Output = Result<T, IndexerError>>,
44 {
45 let max_retries = config.num_retries;
46 let mut attempts = 0;
47 let mut delay_ms = config.delay_ms;
48 let max_delay_ms = config.max_delay_ms;
49
50 loop {
51 attempts += 1;
52
53 trace!(
54 "Attempt {}/{}: No rate limiter configured",
55 attempts,
56 max_retries
57 );
58
59 trace!("Attempt {}/{}: Executing operation", attempts, max_retries);
60 let result = operation().await;
61
62 match result {
63 Ok(value) => {
64 trace!("Attempt {}/{}: Operation succeeded.", attempts, max_retries);
65 return Ok(value);
66 }
67 Err(e) => {
68 let is_retryable = match &e {
69 IndexerError::ApiError(_) => {
70 warn!("API Error: {}", e);
71 true
72 }
73 IndexerError::PhotonError {
74 context: _,
75 message: _,
76 } => {
77 warn!("Operation failed, checking if retryable...");
78 true
79 }
80 IndexerError::IndexerNotSyncedToSlot => true,
81 IndexerError::Base58DecodeError { .. } => false,
82 IndexerError::AccountNotFound => false,
83 IndexerError::InvalidParameters(_) => false,
84 IndexerError::NotImplemented(_) => false,
85 _ => false,
86 };
87
88 if is_retryable && attempts < max_retries {
89 warn!(
90 "Attempt {}/{}: Operation failed. Retrying",
91 attempts, max_retries
92 );
93
94 tokio::time::sleep(Duration::from_millis(delay_ms)).await;
95 delay_ms = std::cmp::min(delay_ms * 2, max_delay_ms);
96 } else {
97 if is_retryable {
98 error!("Operation failed after max retries.");
99 } else {
100 error!("Operation failed with non-retryable error.");
101 }
102 return Err(e);
103 }
104 }
105 }
106 }
107 }
108}
109
110impl PhotonIndexer {
111 pub fn new(path: String, api_key: Option<String>) -> Self {
112 let configuration = Configuration {
113 base_path: path,
114 api_key: api_key.map(|key| ApiKey {
115 prefix: Some("api-key".to_string()),
116 key,
117 }),
118 ..Default::default()
119 };
120
121 PhotonIndexer { configuration }
122 }
123
124 pub fn new_with_config(configuration: Configuration) -> Self {
125 PhotonIndexer { configuration }
126 }
127
128 fn extract_result<T>(context: &str, result: Option<T>) -> Result<T, IndexerError> {
129 result.ok_or_else(|| IndexerError::missing_result(context, "value not present"))
130 }
131
132 fn extract_result_with_error_check<T>(
133 context: &str,
134 error: Option<Box<photon_api::models::GetBatchAddressUpdateInfoPost200ResponseError>>,
135 result: Option<T>,
136 ) -> Result<T, IndexerError> {
137 if let Some(error) = error {
138 let error_message = error
139 .clone()
140 .message
141 .unwrap_or_else(|| format!("Unknown API error: {:?}", error).to_string());
142 return Err(IndexerError::ApiError(format!(
143 "API error in {} (code: {:?}): {}",
144 context, error.code, error_message
145 )));
146 }
147
148 Self::extract_result(context, result)
149 }
150
151 fn build_account_params(
152 &self,
153 address: Option<Address>,
154 hash: Option<Hash>,
155 ) -> Result<photon_api::models::GetCompressedAccountPostRequestParams, IndexerError> {
156 match (address, hash) {
157 (None, None) => Err(IndexerError::InvalidParameters(
158 "Either address or hash must be provided".to_string(),
159 )),
160 (Some(_), Some(_)) => Err(IndexerError::InvalidParameters(
161 "Only one of address or hash must be provided".to_string(),
162 )),
163 (address, hash) => Ok(photon_api::models::GetCompressedAccountPostRequestParams {
164 address: address.map(|x| x.to_base58()),
165 hash: hash.map(|x| x.to_base58()),
166 }),
167 }
168 }
169}
170
171impl Debug for PhotonIndexer {
172 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
173 f.debug_struct("PhotonIndexer")
174 .field("configuration", &self.configuration)
175 .finish()
176 }
177}
178
179#[async_trait]
180impl Indexer for PhotonIndexer {
181 async fn get_compressed_account(
182 &self,
183 address: Address,
184 config: Option<IndexerRpcConfig>,
185 ) -> Result<Response<Option<CompressedAccount>>, IndexerError> {
186 let config = config.unwrap_or_default();
187 self.retry(config.retry_config, || async {
188 let params = self.build_account_params(Some(address), None)?;
189 let request = photon_api::models::GetCompressedAccountPostRequest {
190 params: Box::new(params),
191 ..Default::default()
192 };
193
194 let result = photon_api::apis::default_api::get_compressed_account_post(
195 &self.configuration,
196 request,
197 )
198 .await?;
199 let api_response = Self::extract_result_with_error_check(
200 "get_compressed_account",
201 result.error,
202 result.result.map(|r| *r),
203 )?;
204 if api_response.context.slot < config.slot {
205 return Err(IndexerError::IndexerNotSyncedToSlot);
206 }
207 let account = match api_response.value {
208 Some(boxed) => Some(CompressedAccount::try_from(&*boxed)?),
209 None => None,
210 };
211
212 Ok(Response {
213 context: Context {
214 slot: api_response.context.slot,
215 },
216 value: account,
217 })
218 })
219 .await
220 }
221
222 async fn get_compressed_account_by_hash(
223 &self,
224 hash: Hash,
225 config: Option<IndexerRpcConfig>,
226 ) -> Result<Response<Option<CompressedAccount>>, IndexerError> {
227 let config = config.unwrap_or_default();
228 self.retry(config.retry_config, || async {
229 let params = self.build_account_params(None, Some(hash))?;
230 let request = photon_api::models::GetCompressedAccountPostRequest {
231 params: Box::new(params),
232 ..Default::default()
233 };
234
235 let result = photon_api::apis::default_api::get_compressed_account_post(
236 &self.configuration,
237 request,
238 )
239 .await?;
240 let api_response = Self::extract_result_with_error_check(
241 "get_compressed_account_by_hash",
242 result.error,
243 result.result.map(|r| *r),
244 )?;
245 if api_response.context.slot < config.slot {
246 return Err(IndexerError::IndexerNotSyncedToSlot);
247 }
248 let account = match api_response.value {
249 Some(boxed) => Some(CompressedAccount::try_from(&*boxed)?),
250 None => None,
251 };
252
253 Ok(Response {
254 context: Context {
255 slot: api_response.context.slot,
256 },
257 value: account,
258 })
259 })
260 .await
261 }
262
263 async fn get_compressed_accounts_by_owner(
264 &self,
265 owner: &Pubkey,
266 options: Option<GetCompressedAccountsByOwnerConfig>,
267 config: Option<IndexerRpcConfig>,
268 ) -> Result<Response<ItemsWithCursor<CompressedAccount>>, IndexerError> {
269 let config = config.unwrap_or_default();
270 self.retry(config.retry_config, || async {
271 #[cfg(feature = "v2")]
272 {
273 let request = photon_api::models::GetCompressedAccountsByOwnerV2PostRequest {
274 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
275 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
276 data_slice: options.as_ref().and_then(|o| {
277 o.data_slice.as_ref().map(|ds| {
278 Box::new(photon_api::models::DataSlice {
279 length: ds.length as u32,
280 offset: ds.offset as u32,
281 })
282 })
283 }),
284 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
285 limit: options.as_ref().and_then(|o| o.limit),
286 owner: owner.to_string(),
287 }),
288 ..Default::default()
289 };
290 let result =
291 photon_api::apis::default_api::get_compressed_accounts_by_owner_v2_post(
292 &self.configuration,
293 request,
294 )
295 .await?;
296 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
297 if response.context.slot < config.slot {
298 return Err(IndexerError::IndexerNotSyncedToSlot);
299 }
300 let accounts: Result<Vec<_>, _> = response
301 .value
302 .items
303 .iter()
304 .map(CompressedAccount::try_from)
305 .collect();
306
307 let cursor = response.value.cursor;
308
309 Ok(Response {
310 context: Context {
311 slot: response.context.slot,
312 },
313 value: ItemsWithCursor {
314 items: accounts?,
315 cursor,
316 },
317 })
318 }
319 #[cfg(not(feature = "v2"))]
320 {
321 let request = photon_api::models::GetCompressedAccountsByOwnerPostRequest {
322 params: Box::from(GetCompressedAccountsByOwnerPostRequestParams {
323 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
324 data_slice: options.as_ref().and_then(|o| {
325 o.data_slice.as_ref().map(|ds| {
326 Box::new(photon_api::models::DataSlice {
327 length: ds.length as u32,
328 offset: ds.offset as u32,
329 })
330 })
331 }),
332 filters: options.as_ref().and_then(|o| o.filters_to_photon()),
333 limit: options.as_ref().and_then(|o| o.limit),
334 owner: owner.to_string(),
335 }),
336 ..Default::default()
337 };
338 let result = photon_api::apis::default_api::get_compressed_accounts_by_owner_post(
339 &self.configuration,
340 request,
341 )
342 .await?;
343 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
344 if response.context.slot < config.slot {
345 return Err(IndexerError::IndexerNotSyncedToSlot);
346 }
347 let accounts: Result<Vec<_>, _> = response
348 .value
349 .items
350 .iter()
351 .map(CompressedAccount::try_from)
352 .collect();
353
354 let cursor = response.value.cursor;
355
356 Ok(Response {
357 context: Context {
358 slot: response.context.slot,
359 },
360 value: ItemsWithCursor {
361 items: accounts?,
362 cursor,
363 },
364 })
365 }
366 })
367 .await
368 }
369
370 async fn get_compressed_balance(
371 &self,
372 address: Option<Address>,
373 hash: Option<Hash>,
374 config: Option<IndexerRpcConfig>,
375 ) -> Result<Response<u64>, IndexerError> {
376 let config = config.unwrap_or_default();
377 self.retry(config.retry_config, || async {
378 let params = self.build_account_params(address, hash)?;
379 let request = photon_api::models::GetCompressedAccountBalancePostRequest {
380 params: Box::new(params),
381 ..Default::default()
382 };
383
384 let result = photon_api::apis::default_api::get_compressed_account_balance_post(
385 &self.configuration,
386 request,
387 )
388 .await?;
389
390 let api_response = Self::extract_result_with_error_check(
391 "get_compressed_account_balance",
392 result.error,
393 result.result.map(|r| *r),
394 )?;
395 if api_response.context.slot < config.slot {
396 return Err(IndexerError::IndexerNotSyncedToSlot);
397 }
398 Ok(Response {
399 context: Context {
400 slot: api_response.context.slot,
401 },
402 value: api_response.value,
403 })
404 })
405 .await
406 }
407
408 async fn get_compressed_balance_by_owner(
409 &self,
410 owner: &Pubkey,
411 config: Option<IndexerRpcConfig>,
412 ) -> Result<Response<u64>, IndexerError> {
413 let config = config.unwrap_or_default();
414 self.retry(config.retry_config, || async {
415 let request = photon_api::models::GetCompressedBalanceByOwnerPostRequest {
416 params: Box::new(
417 photon_api::models::GetCompressedBalanceByOwnerPostRequestParams {
418 owner: owner.to_string(),
419 },
420 ),
421 ..Default::default()
422 };
423
424 let result = photon_api::apis::default_api::get_compressed_balance_by_owner_post(
425 &self.configuration,
426 request,
427 )
428 .await?;
429
430 let api_response = Self::extract_result_with_error_check(
431 "get_compressed_balance_by_owner",
432 result.error,
433 result.result.map(|r| *r),
434 )?;
435 if api_response.context.slot < config.slot {
436 return Err(IndexerError::IndexerNotSyncedToSlot);
437 }
438 Ok(Response {
439 context: Context {
440 slot: api_response.context.slot,
441 },
442 value: api_response.value,
443 })
444 })
445 .await
446 }
447
448 async fn get_compressed_mint_token_holders(
449 &self,
450 mint: &Pubkey,
451 options: Option<PaginatedOptions>,
452 config: Option<IndexerRpcConfig>,
453 ) -> Result<Response<ItemsWithCursor<OwnerBalance>>, IndexerError> {
454 let config = config.unwrap_or_default();
455 self.retry(config.retry_config, || async {
456 let request = photon_api::models::GetCompressedMintTokenHoldersPostRequest {
457 params: Box::new(
458 photon_api::models::GetCompressedMintTokenHoldersPostRequestParams {
459 mint: mint.to_string(),
460 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
461 limit: options.as_ref().and_then(|o| o.limit),
462 },
463 ),
464 ..Default::default()
465 };
466
467 let result = photon_api::apis::default_api::get_compressed_mint_token_holders_post(
468 &self.configuration,
469 request,
470 )
471 .await?;
472
473 let api_response = Self::extract_result_with_error_check(
474 "get_compressed_mint_token_holders",
475 result.error,
476 result.result.map(|r| *r),
477 )?;
478 if api_response.context.slot < config.slot {
479 return Err(IndexerError::IndexerNotSyncedToSlot);
480 }
481 let owner_balances: Result<Vec<_>, _> = api_response
482 .value
483 .items
484 .iter()
485 .map(OwnerBalance::try_from)
486 .collect();
487
488 let cursor = api_response.value.cursor;
489
490 Ok(Response {
491 context: Context {
492 slot: api_response.context.slot,
493 },
494 value: ItemsWithCursor {
495 items: owner_balances?,
496 cursor,
497 },
498 })
499 })
500 .await
501 }
502
503 async fn get_compressed_token_account_balance(
504 &self,
505 address: Option<Address>,
506 hash: Option<Hash>,
507 config: Option<IndexerRpcConfig>,
508 ) -> Result<Response<u64>, IndexerError> {
509 let config = config.unwrap_or_default();
510 self.retry(config.retry_config, || async {
511 let request = photon_api::models::GetCompressedTokenAccountBalancePostRequest {
512 params: Box::new(photon_api::models::GetCompressedAccountPostRequestParams {
513 address: address.map(|x| x.to_base58()),
514 hash: hash.map(|x| x.to_base58()),
515 }),
516 ..Default::default()
517 };
518
519 let result = photon_api::apis::default_api::get_compressed_token_account_balance_post(
520 &self.configuration,
521 request,
522 )
523 .await?;
524
525 let api_response = Self::extract_result_with_error_check(
526 "get_compressed_token_account_balance",
527 result.error,
528 result.result.map(|r| *r),
529 )?;
530 if api_response.context.slot < config.slot {
531 return Err(IndexerError::IndexerNotSyncedToSlot);
532 }
533 Ok(Response {
534 context: Context {
535 slot: api_response.context.slot,
536 },
537 value: api_response.value.amount,
538 })
539 })
540 .await
541 }
542
543 async fn get_compressed_token_accounts_by_delegate(
544 &self,
545 delegate: &Pubkey,
546 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
547 config: Option<IndexerRpcConfig>,
548 ) -> Result<Response<ItemsWithCursor<CompressedTokenAccount>>, IndexerError> {
549 let config = config.unwrap_or_default();
550 self.retry(config.retry_config, || async {
551 #[cfg(feature = "v2")]
552 {
553 let request = photon_api::models::GetCompressedTokenAccountsByDelegateV2PostRequest {
554 params: Box::new(
555 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
556 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
557 limit: options.as_ref().and_then(|o| o.limit),
558 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
559 delegate: delegate.to_string(),
560 },
561 ),
562 ..Default::default()
563 };
564
565 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_v2_post(
566 &self.configuration,
567 request,
568 )
569 .await?;
570
571 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
572 if response.context.slot < config.slot {
573 return Err(IndexerError::IndexerNotSyncedToSlot);
574 }
575
576 let token_accounts: Result<Vec<_>, _> = response
577 .value
578 .items
579 .iter()
580 .map(CompressedTokenAccount::try_from)
581 .collect();
582
583 let cursor = response.value.cursor;
584
585 Ok(Response {
586 context: Context {
587 slot: response.context.slot,
588 },
589 value: ItemsWithCursor {
590 items: token_accounts?,
591 cursor,
592 },
593 })
594 }
595 #[cfg(not(feature = "v2"))]
596 {
597 let request = photon_api::models::GetCompressedTokenAccountsByDelegatePostRequest {
598 params: Box::new(
599 photon_api::models::GetCompressedTokenAccountsByDelegatePostRequestParams {
600 delegate: delegate.to_string(),
601 mint: options.as_ref().and_then(|o| o.mint.as_ref()).map(|x| x.to_string()),
602 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
603 limit: options.as_ref().and_then(|o| o.limit),
604 },
605 ),
606 ..Default::default()
607 };
608
609 let result = photon_api::apis::default_api::get_compressed_token_accounts_by_delegate_post(
610 &self.configuration,
611 request,
612 )
613 .await?;
614
615 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
616 if response.context.slot < config.slot {
617 return Err(IndexerError::IndexerNotSyncedToSlot);
618 }
619
620 let token_accounts: Result<Vec<_>, _> = response
621 .value
622 .items
623 .iter()
624 .map(CompressedTokenAccount::try_from)
625 .collect();
626
627 let cursor = response.value.cursor;
628
629 Ok(Response {
630 context: Context {
631 slot: response.context.slot,
632 },
633 value: ItemsWithCursor {
634 items: token_accounts?,
635 cursor,
636 },
637 })
638 }
639 })
640 .await
641 }
642
643 async fn get_compressed_token_accounts_by_owner(
644 &self,
645 owner: &Pubkey,
646 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
647 config: Option<IndexerRpcConfig>,
648 ) -> Result<Response<ItemsWithCursor<CompressedTokenAccount>>, IndexerError> {
649 let config = config.unwrap_or_default();
650 self.retry(config.retry_config, || async {
651 #[cfg(feature = "v2")]
652 {
653 let request = photon_api::models::GetCompressedTokenAccountsByOwnerV2PostRequest {
654 params: Box::from(
655 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
656 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
657 limit: options.as_ref().and_then(|o| o.limit),
658 mint: options
659 .as_ref()
660 .and_then(|o| o.mint.as_ref())
661 .map(|x| x.to_string()),
662 owner: owner.to_string(),
663 },
664 ),
665 ..Default::default()
666 };
667 let result =
668 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_v2_post(
669 &self.configuration,
670 request,
671 )
672 .await?;
673 let response = result.result.ok_or(IndexerError::AccountNotFound)?;
674 if response.context.slot < config.slot {
675 return Err(IndexerError::IndexerNotSyncedToSlot);
676 }
677 let token_accounts: Result<Vec<_>, _> = response
678 .value
679 .items
680 .iter()
681 .map(CompressedTokenAccount::try_from)
682 .collect();
683
684 let cursor = response.value.cursor;
685
686 Ok(Response {
687 context: Context {
688 slot: response.context.slot,
689 },
690 value: ItemsWithCursor {
691 items: token_accounts?,
692 cursor,
693 },
694 })
695 }
696 #[cfg(not(feature = "v2"))]
697 {
698 let request = photon_api::models::GetCompressedTokenAccountsByOwnerPostRequest {
699 params: Box::new(
700 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
701 owner: owner.to_string(),
702 mint: options
703 .as_ref()
704 .and_then(|o| o.mint.as_ref())
705 .map(|x| x.to_string()),
706 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
707 limit: options.as_ref().and_then(|o| o.limit),
708 },
709 ),
710 ..Default::default()
711 };
712
713 let result =
714 photon_api::apis::default_api::get_compressed_token_accounts_by_owner_post(
715 &self.configuration,
716 request,
717 )
718 .await?;
719
720 let response = Self::extract_result_with_error_check(
721 "get_compressed_token_accounts_by_owner",
722 result.error,
723 result.result.map(|r| *r),
724 )?;
725 if response.context.slot < config.slot {
726 return Err(IndexerError::IndexerNotSyncedToSlot);
727 }
728 let token_accounts: Result<Vec<_>, _> = response
729 .value
730 .items
731 .iter()
732 .map(CompressedTokenAccount::try_from)
733 .collect();
734
735 let cursor = response.value.cursor;
736
737 Ok(Response {
738 context: Context {
739 slot: response.context.slot,
740 },
741 value: ItemsWithCursor {
742 items: token_accounts?,
743 cursor,
744 },
745 })
746 }
747 })
748 .await
749 }
750
751 async fn get_compressed_token_balances_by_owner_v2(
752 &self,
753 owner: &Pubkey,
754 options: Option<GetCompressedTokenAccountsByOwnerOrDelegateOptions>,
755 config: Option<IndexerRpcConfig>,
756 ) -> Result<Response<ItemsWithCursor<TokenBalance>>, IndexerError> {
757 let config = config.unwrap_or_default();
758 self.retry(config.retry_config, || async {
759 #[cfg(feature = "v2")]
760 {
761 let request = photon_api::models::GetCompressedTokenBalancesByOwnerV2PostRequest {
762 params: Box::new(
763 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
764 owner: owner.to_string(),
765 mint: options
766 .as_ref()
767 .and_then(|o| o.mint.as_ref())
768 .map(|x| x.to_string()),
769 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
770 limit: options.as_ref().and_then(|o| o.limit),
771 },
772 ),
773 ..Default::default()
774 };
775
776 let result =
777 photon_api::apis::default_api::get_compressed_token_balances_by_owner_v2_post(
778 &self.configuration,
779 request,
780 )
781 .await?;
782
783 let api_response = Self::extract_result_with_error_check(
784 "get_compressed_token_balances_by_owner_v2",
785 result.error,
786 result.result.map(|r| *r),
787 )?;
788 if api_response.context.slot < config.slot {
789 return Err(IndexerError::IndexerNotSyncedToSlot);
790 }
791
792 let token_balances: Result<Vec<_>, _> = api_response
793 .value
794 .items
795 .iter()
796 .map(TokenBalance::try_from)
797 .collect();
798
799 Ok(Response {
800 context: Context {
801 slot: api_response.context.slot,
802 },
803 value: ItemsWithCursor {
804 items: token_balances?,
805 cursor: api_response.value.cursor,
806 },
807 })
808 }
809 #[cfg(not(feature = "v2"))]
810 {
811 let request = photon_api::models::GetCompressedTokenBalancesByOwnerPostRequest {
812 params: Box::new(
813 photon_api::models::GetCompressedTokenAccountsByOwnerPostRequestParams {
814 owner: owner.to_string(),
815 mint: options
816 .as_ref()
817 .and_then(|o| o.mint.as_ref())
818 .map(|x| x.to_string()),
819 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
820 limit: options.as_ref().and_then(|o| o.limit),
821 },
822 ),
823 ..Default::default()
824 };
825
826 let result =
827 photon_api::apis::default_api::get_compressed_token_balances_by_owner_post(
828 &self.configuration,
829 request,
830 )
831 .await?;
832
833 let api_response = Self::extract_result_with_error_check(
834 "get_compressed_token_balances_by_owner",
835 result.error,
836 result.result.map(|r| *r),
837 )?;
838 if api_response.context.slot < config.slot {
839 return Err(IndexerError::IndexerNotSyncedToSlot);
840 }
841
842 let token_balances: Result<Vec<_>, _> = api_response
843 .value
844 .token_balances
845 .iter()
846 .map(TokenBalance::try_from)
847 .collect();
848
849 Ok(Response {
850 context: Context {
851 slot: api_response.context.slot,
852 },
853 value: ItemsWithCursor {
854 items: token_balances?,
855 cursor: api_response.value.cursor,
856 },
857 })
858 }
859 })
860 .await
861 }
862
863 async fn get_compression_signatures_for_account(
864 &self,
865 hash: Hash,
866 config: Option<IndexerRpcConfig>,
867 ) -> Result<Response<Items<SignatureWithMetadata>>, IndexerError> {
868 let config = config.unwrap_or_default();
869 self.retry(config.retry_config, || async {
870 let request = photon_api::models::GetCompressionSignaturesForAccountPostRequest {
871 params: Box::new(
872 photon_api::models::GetCompressedAccountProofPostRequestParams {
873 hash: hash.to_base58(),
874 },
875 ),
876 ..Default::default()
877 };
878
879 let result =
880 photon_api::apis::default_api::get_compression_signatures_for_account_post(
881 &self.configuration,
882 request,
883 )
884 .await?;
885
886 let api_response = Self::extract_result_with_error_check(
887 "get_compression_signatures_for_account",
888 result.error,
889 result.result.map(|r| *r),
890 )?;
891 if api_response.context.slot < config.slot {
892 return Err(IndexerError::IndexerNotSyncedToSlot);
893 }
894 let signatures = api_response
895 .value
896 .items
897 .iter()
898 .map(SignatureWithMetadata::try_from)
899 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
900
901 Ok(Response {
902 context: Context {
903 slot: api_response.context.slot,
904 },
905 value: Items { items: signatures },
906 })
907 })
908 .await
909 }
910
911 async fn get_compression_signatures_for_address(
912 &self,
913 address: &[u8; 32],
914 options: Option<PaginatedOptions>,
915 config: Option<IndexerRpcConfig>,
916 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
917 let config = config.unwrap_or_default();
918 self.retry(config.retry_config, || async {
919 let request = photon_api::models::GetCompressionSignaturesForAddressPostRequest {
920 params: Box::new(
921 photon_api::models::GetCompressionSignaturesForAddressPostRequestParams {
922 address: address.to_base58(),
923 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
924 limit: options.as_ref().and_then(|o| o.limit),
925 },
926 ),
927 ..Default::default()
928 };
929
930 let result =
931 photon_api::apis::default_api::get_compression_signatures_for_address_post(
932 &self.configuration,
933 request,
934 )
935 .await?;
936
937 let api_response = Self::extract_result_with_error_check(
938 "get_compression_signatures_for_address",
939 result.error,
940 result.result.map(|r| *r),
941 )?;
942 if api_response.context.slot < config.slot {
943 return Err(IndexerError::IndexerNotSyncedToSlot);
944 }
945
946 let signatures = api_response
947 .value
948 .items
949 .iter()
950 .map(SignatureWithMetadata::try_from)
951 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
952
953 let cursor = api_response.value.cursor;
954
955 Ok(Response {
956 context: Context {
957 slot: api_response.context.slot,
958 },
959 value: ItemsWithCursor {
960 items: signatures,
961 cursor,
962 },
963 })
964 })
965 .await
966 }
967
968 async fn get_compression_signatures_for_owner(
969 &self,
970 owner: &Pubkey,
971 options: Option<PaginatedOptions>,
972 config: Option<IndexerRpcConfig>,
973 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
974 let config = config.unwrap_or_default();
975 self.retry(config.retry_config, || async {
976 let request = photon_api::models::GetCompressionSignaturesForOwnerPostRequest {
977 params: Box::new(
978 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
979 owner: owner.to_string(),
980 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
981 limit: options.as_ref().and_then(|o| o.limit),
982 },
983 ),
984 ..Default::default()
985 };
986
987 let result = photon_api::apis::default_api::get_compression_signatures_for_owner_post(
988 &self.configuration,
989 request,
990 )
991 .await?;
992
993 let api_response = Self::extract_result_with_error_check(
994 "get_compression_signatures_for_owner",
995 result.error,
996 result.result.map(|r| *r),
997 )?;
998 if api_response.context.slot < config.slot {
999 return Err(IndexerError::IndexerNotSyncedToSlot);
1000 }
1001
1002 let signatures = api_response
1003 .value
1004 .items
1005 .iter()
1006 .map(SignatureWithMetadata::try_from)
1007 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1008
1009 let cursor = api_response.value.cursor;
1010
1011 Ok(Response {
1012 context: Context {
1013 slot: api_response.context.slot,
1014 },
1015 value: ItemsWithCursor {
1016 items: signatures,
1017 cursor,
1018 },
1019 })
1020 })
1021 .await
1022 }
1023
1024 async fn get_compression_signatures_for_token_owner(
1025 &self,
1026 owner: &Pubkey,
1027 options: Option<PaginatedOptions>,
1028 config: Option<IndexerRpcConfig>,
1029 ) -> Result<Response<ItemsWithCursor<SignatureWithMetadata>>, IndexerError> {
1030 let config = config.unwrap_or_default();
1031 self.retry(config.retry_config, || async {
1032 let request = photon_api::models::GetCompressionSignaturesForTokenOwnerPostRequest {
1033 params: Box::new(
1034 photon_api::models::GetCompressionSignaturesForOwnerPostRequestParams {
1035 owner: owner.to_string(),
1036 cursor: options.as_ref().and_then(|o| o.cursor.clone()),
1037 limit: options.as_ref().and_then(|o| o.limit),
1038 },
1039 ),
1040 ..Default::default()
1041 };
1042
1043 let result =
1044 photon_api::apis::default_api::get_compression_signatures_for_token_owner_post(
1045 &self.configuration,
1046 request,
1047 )
1048 .await?;
1049
1050 let api_response = Self::extract_result_with_error_check(
1051 "get_compression_signatures_for_token_owner",
1052 result.error,
1053 result.result.map(|r| *r),
1054 )?;
1055 if api_response.context.slot < config.slot {
1056 return Err(IndexerError::IndexerNotSyncedToSlot);
1057 }
1058
1059 let signatures = api_response
1060 .value
1061 .items
1062 .iter()
1063 .map(SignatureWithMetadata::try_from)
1064 .collect::<Result<Vec<SignatureWithMetadata>, IndexerError>>()?;
1065
1066 let cursor = api_response.value.cursor;
1067
1068 Ok(Response {
1069 context: Context {
1070 slot: api_response.context.slot,
1071 },
1072 value: ItemsWithCursor {
1073 items: signatures,
1074 cursor,
1075 },
1076 })
1077 })
1078 .await
1079 }
1080
1081 async fn get_indexer_health(&self, config: Option<RetryConfig>) -> Result<bool, IndexerError> {
1082 let config = config.unwrap_or_default();
1083 self.retry(config, || async {
1084 let request = photon_api::models::GetIndexerHealthPostRequest {
1085 ..Default::default()
1086 };
1087
1088 let result = photon_api::apis::default_api::get_indexer_health_post(
1089 &self.configuration,
1090 request,
1091 )
1092 .await?;
1093
1094 let _api_response = Self::extract_result_with_error_check(
1095 "get_indexer_health",
1096 result.error,
1097 result.result,
1098 )?;
1099
1100 Ok(true)
1101 })
1102 .await
1103 }
1104
1105 async fn get_indexer_slot(&self, config: Option<RetryConfig>) -> Result<u64, IndexerError> {
1106 let config = config.unwrap_or_default();
1107 self.retry(config, || async {
1108 let request = photon_api::models::GetIndexerSlotPostRequest {
1109 ..Default::default()
1110 };
1111
1112 let result =
1113 photon_api::apis::default_api::get_indexer_slot_post(&self.configuration, request)
1114 .await?;
1115
1116 let result = Self::extract_result_with_error_check(
1117 "get_indexer_slot",
1118 result.error,
1119 result.result,
1120 )?;
1121 Ok(result)
1122 })
1123 .await
1124 }
1125
1126 async fn get_multiple_compressed_account_proofs(
1127 &self,
1128 hashes: Vec<[u8; 32]>,
1129 config: Option<IndexerRpcConfig>,
1130 ) -> Result<Response<Items<MerkleProof>>, IndexerError> {
1131 let config = config.unwrap_or_default();
1132 self.retry(config.retry_config, || async {
1133 let hashes_for_async = hashes.clone();
1134
1135 let request: photon_api::models::GetMultipleCompressedAccountProofsPostRequest =
1136 photon_api::models::GetMultipleCompressedAccountProofsPostRequest {
1137 params: hashes_for_async
1138 .into_iter()
1139 .map(|hash| bs58::encode(hash).into_string())
1140 .collect(),
1141 ..Default::default()
1142 };
1143
1144 let result =
1145 photon_api::apis::default_api::get_multiple_compressed_account_proofs_post(
1146 &self.configuration,
1147 request,
1148 )
1149 .await?;
1150
1151 if let Some(error) = &result.error {
1152 let error_msg = error.message.as_deref().unwrap_or("Unknown error");
1153 let error_code = error.code.unwrap_or(0);
1154 tracing::error!("API returned error: {}", error_msg);
1155 return Err(IndexerError::PhotonError {
1156 context: "get_multiple_compressed_account_proofs".to_string(),
1157 message: format!("API Error (code {}): {}", error_code, error_msg),
1158 });
1159 }
1160
1161 let photon_proofs = result.result.ok_or_else(|| {
1162 IndexerError::missing_result(
1163 "get_multiple_new_address_proofs",
1164 "No result returned from Photon API",
1165 )
1166 })?;
1167 if photon_proofs.context.slot < config.slot {
1168 return Err(IndexerError::IndexerNotSyncedToSlot);
1169 }
1170
1171 let proofs = photon_proofs
1172 .value
1173 .iter()
1174 .map(|x| {
1175 let mut proof_vec = x.proof.clone();
1176 proof_vec.truncate(proof_vec.len() - 10); let proof = proof_vec
1179 .iter()
1180 .map(|x| Hash::from_base58(x))
1181 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()
1182 .map_err(|e| IndexerError::Base58DecodeError {
1183 field: "proof".to_string(),
1184 message: e.to_string(),
1185 })?;
1186
1187 Ok(MerkleProof {
1188 hash: <[u8; 32] as Base58Conversions>::from_base58(&x.hash)?,
1189 leaf_index: x.leaf_index,
1190 merkle_tree: Pubkey::from_str_const(x.merkle_tree.as_str()),
1191 proof,
1192 root_seq: x.root_seq,
1193 root: <[u8; 32] as Base58Conversions>::from_base58(&x.root)?,
1194 })
1195 })
1196 .collect::<Result<Vec<MerkleProof>, IndexerError>>()?;
1197
1198 Ok(Response {
1199 context: Context {
1200 slot: photon_proofs.context.slot,
1201 },
1202 value: Items { items: proofs },
1203 })
1204 })
1205 .await
1206 }
1207
1208 async fn get_multiple_compressed_accounts(
1209 &self,
1210 addresses: Option<Vec<Address>>,
1211 hashes: Option<Vec<Hash>>,
1212 config: Option<IndexerRpcConfig>,
1213 ) -> Result<Response<Items<Option<CompressedAccount>>>, IndexerError> {
1214 let config = config.unwrap_or_default();
1215 self.retry(config.retry_config, || async {
1216 let hashes = hashes.clone();
1217 let addresses = addresses.clone();
1218 let request = photon_api::models::GetMultipleCompressedAccountsPostRequest {
1219 params: Box::new(
1220 photon_api::models::GetMultipleCompressedAccountsPostRequestParams {
1221 addresses: addresses.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1222 hashes: hashes.map(|x| x.iter().map(|x| x.to_base58()).collect()),
1223 },
1224 ),
1225 ..Default::default()
1226 };
1227
1228 let result = photon_api::apis::default_api::get_multiple_compressed_accounts_post(
1229 &self.configuration,
1230 request,
1231 )
1232 .await?;
1233
1234 let api_response = Self::extract_result_with_error_check(
1235 "get_multiple_compressed_accounts",
1236 result.error,
1237 result.result.map(|r| *r),
1238 )?;
1239 if api_response.context.slot < config.slot {
1240 return Err(IndexerError::IndexerNotSyncedToSlot);
1241 }
1242 let accounts = api_response
1243 .value
1244 .items
1245 .iter()
1246 .map(|account_opt| match account_opt {
1247 Some(account) => CompressedAccount::try_from(account).map(Some),
1248 None => Ok(None),
1249 })
1250 .collect::<Result<Vec<Option<CompressedAccount>>, IndexerError>>()?;
1251
1252 Ok(Response {
1253 context: Context {
1254 slot: api_response.context.slot,
1255 },
1256 value: Items { items: accounts },
1257 })
1258 })
1259 .await
1260 }
1261
1262 async fn get_multiple_new_address_proofs(
1263 &self,
1264 merkle_tree_pubkey: [u8; 32],
1265 addresses: Vec<[u8; 32]>,
1266 config: Option<IndexerRpcConfig>,
1267 ) -> Result<Response<Items<NewAddressProofWithContext>>, IndexerError> {
1268 let config = config.unwrap_or_default();
1269 self.retry(config.retry_config, || async {
1270 let params: Vec<photon_api::models::address_with_tree::AddressWithTree> = addresses
1271 .iter()
1272 .map(|x| photon_api::models::address_with_tree::AddressWithTree {
1273 address: bs58::encode(x).into_string(),
1274 tree: bs58::encode(&merkle_tree_pubkey).into_string(),
1275 })
1276 .collect();
1277
1278 let request = photon_api::models::GetMultipleNewAddressProofsV2PostRequest {
1279 params,
1280 ..Default::default()
1281 };
1282
1283 let result = photon_api::apis::default_api::get_multiple_new_address_proofs_v2_post(
1284 &self.configuration,
1285 request,
1286 )
1287 .await;
1288
1289 let result = result?;
1290
1291 let api_response = match Self::extract_result_with_error_check(
1292 "get_multiple_new_address_proofs",
1293 result.error,
1294 result.result.map(|r| *r),
1295 ) {
1296 Ok(proofs) => proofs,
1297 Err(e) => {
1298 error!("Failed to extract proofs: {:?}", e);
1299 return Err(e);
1300 }
1301 };
1302 if api_response.context.slot < config.slot {
1303 return Err(IndexerError::IndexerNotSyncedToSlot);
1304 }
1305 let photon_proofs = api_response.value;
1306 let mut proofs = Vec::new();
1307 for photon_proof in photon_proofs {
1308 let tree_pubkey = Hash::from_base58(&photon_proof.merkle_tree).map_err(|e| {
1309 IndexerError::Base58DecodeError {
1310 field: "merkle_tree".to_string(),
1311 message: e.to_string(),
1312 }
1313 })?;
1314
1315 let low_address_value = Hash::from_base58(&photon_proof.lower_range_address)
1316 .map_err(|e| IndexerError::Base58DecodeError {
1317 field: "lower_range_address".to_string(),
1318 message: e.to_string(),
1319 })?;
1320
1321 let next_address_value = Hash::from_base58(&photon_proof.higher_range_address)
1322 .map_err(|e| IndexerError::Base58DecodeError {
1323 field: "higher_range_address".to_string(),
1324 message: e.to_string(),
1325 })?;
1326
1327 let mut proof_vec: Vec<[u8; 32]> = photon_proof
1328 .proof
1329 .iter()
1330 .map(|x: &String| Hash::from_base58(x))
1331 .collect::<Result<Vec<[u8; 32]>, IndexerError>>()?;
1332
1333 proof_vec.truncate(proof_vec.len() - 10); let mut proof_arr = [[0u8; 32]; 16];
1335 proof_arr.copy_from_slice(&proof_vec);
1336
1337 let root = Hash::from_base58(&photon_proof.root).map_err(|e| {
1338 IndexerError::Base58DecodeError {
1339 field: "root".to_string(),
1340 message: e.to_string(),
1341 }
1342 })?;
1343
1344 let proof = NewAddressProofWithContext {
1345 merkle_tree: tree_pubkey.into(),
1346 low_address_index: photon_proof.low_element_leaf_index,
1347 low_address_value,
1348 low_address_next_index: photon_proof.next_index,
1349 low_address_next_value: next_address_value,
1350 low_address_proof: proof_arr.to_vec(),
1351 root,
1352 root_seq: photon_proof.root_seq,
1353 new_low_element: None,
1354 new_element: None,
1355 new_element_next_value: None,
1356 };
1357 proofs.push(proof);
1358 }
1359
1360 Ok(Response {
1361 context: Context {
1362 slot: api_response.context.slot,
1363 },
1364 value: Items { items: proofs },
1365 })
1366 })
1367 .await
1368 }
1369
1370 async fn get_validity_proof(
1371 &self,
1372 hashes: Vec<Hash>,
1373 new_addresses_with_trees: Vec<AddressWithTree>,
1374 config: Option<IndexerRpcConfig>,
1375 ) -> Result<Response<super::types::ValidityProofWithContext>, IndexerError> {
1376 let config = config.unwrap_or_default();
1377 self.retry(config.retry_config, || async {
1378 #[cfg(feature = "v2")]
1379 {
1380 let request = photon_api::models::GetValidityProofV2PostRequest {
1381 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1382 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1383 new_addresses_with_trees: Some(
1384 new_addresses_with_trees
1385 .iter()
1386 .map(|x| photon_api::models::AddressWithTree {
1387 address: x.address.to_base58(),
1388 tree: x.tree.to_string(),
1389 })
1390 .collect(),
1391 ),
1392 }),
1393 ..Default::default()
1394 };
1395
1396 let result = photon_api::apis::default_api::get_validity_proof_v2_post(
1397 &self.configuration,
1398 request,
1399 )
1400 .await?;
1401
1402 let api_response = Self::extract_result_with_error_check(
1403 "get_validity_proof_v2",
1404 result.error,
1405 result.result.map(|r| *r),
1406 )?;
1407 if api_response.context.slot < config.slot {
1408 return Err(IndexerError::IndexerNotSyncedToSlot);
1409 }
1410 let validity_proof =
1411 super::types::ValidityProofWithContext::from_api_model_v2(*api_response.value)?;
1412
1413 Ok(Response {
1414 context: Context {
1415 slot: api_response.context.slot,
1416 },
1417 value: validity_proof,
1418 })
1419 }
1420 #[cfg(not(feature = "v2"))]
1421 {
1422 let request = photon_api::models::GetValidityProofPostRequest {
1423 params: Box::new(photon_api::models::GetValidityProofPostRequestParams {
1424 hashes: Some(hashes.iter().map(|x| x.to_base58()).collect()),
1425 new_addresses_with_trees: Some(
1426 new_addresses_with_trees
1427 .iter()
1428 .map(|x| photon_api::models::AddressWithTree {
1429 address: x.address.to_base58(),
1430 tree: x.tree.to_string(),
1431 })
1432 .collect(),
1433 ),
1434 }),
1435 ..Default::default()
1436 };
1437
1438 let result = photon_api::apis::default_api::get_validity_proof_post(
1439 &self.configuration,
1440 request,
1441 )
1442 .await?;
1443
1444 let api_response = Self::extract_result_with_error_check(
1445 "get_validity_proof",
1446 result.error,
1447 result.result.map(|r| *r),
1448 )?;
1449 if api_response.context.slot < config.slot {
1450 return Err(IndexerError::IndexerNotSyncedToSlot);
1451 }
1452 let validity_proof = super::types::ValidityProofWithContext::from_api_model(
1453 *api_response.value,
1454 hashes.len(),
1455 )?;
1456
1457 Ok(Response {
1458 context: Context {
1459 slot: api_response.context.slot,
1460 },
1461 value: validity_proof,
1462 })
1463 }
1464 })
1465 .await
1466 }
1467
1468 async fn get_queue_info(
1469 &self,
1470 config: Option<IndexerRpcConfig>,
1471 ) -> Result<Response<super::QueueInfoResult>, IndexerError> {
1472 let config = config.unwrap_or_default();
1473 self.retry(config.retry_config, || async {
1474 let request = photon_api::models::GetQueueInfoPostRequest {
1475 ..Default::default()
1476 };
1477
1478 let result =
1479 photon_api::apis::default_api::get_queue_info_post(&self.configuration, request)
1480 .await?;
1481
1482 let api_response = Self::extract_result_with_error_check(
1483 "get_queue_info",
1484 result.error.map(|e| {
1485 Box::new(
1486 photon_api::models::GetBatchAddressUpdateInfoPost200ResponseError {
1487 code: Some(e.code),
1488 message: Some(e.message),
1489 },
1490 )
1491 }),
1492 result.result,
1493 )?;
1494
1495 if api_response.slot < config.slot {
1496 return Err(IndexerError::IndexerNotSyncedToSlot);
1497 }
1498
1499 let queues = api_response
1500 .queues
1501 .iter()
1502 .map(|q| -> Result<_, IndexerError> {
1503 let tree_bytes = super::base58::decode_base58_to_fixed_array(&q.tree)?;
1504 let queue_bytes = super::base58::decode_base58_to_fixed_array(&q.queue)?;
1505
1506 Ok(super::QueueInfo {
1507 tree: Pubkey::new_from_array(tree_bytes),
1508 queue: Pubkey::new_from_array(queue_bytes),
1509 queue_type: q.queue_type,
1510 queue_size: q.queue_size,
1511 })
1512 })
1513 .collect::<Result<Vec<_>, _>>()?;
1514
1515 Ok(Response {
1516 context: Context {
1517 slot: api_response.slot,
1518 },
1519 value: super::QueueInfoResult {
1520 queues,
1521 slot: api_response.slot,
1522 },
1523 })
1524 })
1525 .await
1526 }
1527
1528 async fn get_queue_elements(
1529 &mut self,
1530 _merkle_tree_pubkey: [u8; 32],
1531 _options: super::QueueElementsV2Options,
1532 _config: Option<IndexerRpcConfig>,
1533 ) -> Result<Response<super::QueueElementsResult>, IndexerError> {
1534 #[cfg(not(feature = "v2"))]
1535 unimplemented!();
1536
1537 #[cfg(feature = "v2")]
1538 {
1539 use crate::indexer::OutputQueueData;
1540 let merkle_tree_pubkey = _merkle_tree_pubkey;
1541 let options = _options;
1542 let config = _config.unwrap_or_default();
1543 self.retry(config.retry_config, || async {
1544 let output_queue = options.output_queue_limit.map(|limit| {
1546 let mut req = photon_api::models::QueueRequest::new(limit);
1547 req.start_index = options.output_queue_start_index;
1548 req.zkp_batch_size = options.output_queue_zkp_batch_size;
1549 req
1550 });
1551
1552 let input_queue = options.input_queue_limit.map(|limit| {
1553 let mut req = photon_api::models::QueueRequest::new(limit);
1554 req.start_index = options.input_queue_start_index;
1555 req.zkp_batch_size = options.input_queue_zkp_batch_size;
1556 req
1557 });
1558
1559 let address_queue = options.address_queue_limit.map(|limit| {
1560 let mut req = photon_api::models::QueueRequest::new(limit);
1561 req.start_index = options.address_queue_start_index;
1562 req.zkp_batch_size = options.address_queue_zkp_batch_size;
1563 req
1564 });
1565
1566 let mut params = photon_api::models::GetQueueElementsPostRequestParams::new(
1567 bs58::encode(merkle_tree_pubkey).into_string(),
1568 );
1569 params.output_queue = output_queue;
1570 params.input_queue = input_queue;
1571 params.address_queue = address_queue;
1572
1573 let request = photon_api::models::GetQueueElementsPostRequest {
1574 params: Box::new(params),
1575 ..Default::default()
1576 };
1577
1578 tracing::info!(
1579 "get_queue_elements request: output_queue={:?}, input_queue={:?}",
1580 request.params.output_queue.as_ref().map(|q| (
1581 q.limit,
1582 q.start_index,
1583 q.zkp_batch_size
1584 )),
1585 request.params.input_queue.as_ref().map(|q| (
1586 q.limit,
1587 q.start_index,
1588 q.zkp_batch_size
1589 )),
1590 );
1591
1592 let result = photon_api::apis::default_api::get_queue_elements_post(
1593 &self.configuration,
1594 request,
1595 )
1596 .await?;
1597
1598 let api_response = Self::extract_result_with_error_check(
1599 "get_queue_elements",
1600 result.error,
1601 result.result.map(|r| *r),
1602 )?;
1603
1604 if api_response.context.slot < config.slot {
1605 return Err(IndexerError::IndexerNotSyncedToSlot);
1606 }
1607
1608 let state_queue = if let Some(state) = api_response.state_queue {
1609 let nodes: Vec<u64> = state.nodes.iter().map(|n| n.index).collect();
1611 let node_hashes: Result<Vec<[u8; 32]>, IndexerError> = state
1612 .nodes
1613 .iter()
1614 .map(|n| Hash::from_base58(&n.hash))
1615 .collect();
1616 let initial_root = Hash::from_base58(&state.initial_root)?;
1617
1618 let output_queue = if let Some(output) = state.output_queue {
1619 let account_hashes: Result<Vec<[u8; 32]>, IndexerError> = output
1620 .account_hashes
1621 .iter()
1622 .map(|h| Hash::from_base58(h))
1623 .collect();
1624 let old_leaves: Result<Vec<[u8; 32]>, IndexerError> =
1625 output.leaves.iter().map(|h| Hash::from_base58(h)).collect();
1626 let leaves_hash_chains: Result<Vec<[u8; 32]>, IndexerError> = output
1627 .leaves_hash_chains
1628 .iter()
1629 .map(|h| Hash::from_base58(h))
1630 .collect();
1631
1632 Some(OutputQueueData {
1633 leaf_indices: output.leaf_indices,
1634 account_hashes: account_hashes?,
1635 old_leaves: old_leaves?,
1636 first_queue_index: output.first_queue_index,
1637 next_index: output.next_index,
1638 leaves_hash_chains: leaves_hash_chains?,
1639 })
1640 } else {
1641 None
1642 };
1643
1644 let input_queue = if let Some(input) = state.input_queue {
1645 let account_hashes: Result<Vec<[u8; 32]>, IndexerError> = input
1646 .account_hashes
1647 .iter()
1648 .map(|h| Hash::from_base58(h))
1649 .collect();
1650 let current_leaves: Result<Vec<[u8; 32]>, IndexerError> =
1651 input.leaves.iter().map(|h| Hash::from_base58(h)).collect();
1652 let tx_hashes: Result<Vec<[u8; 32]>, IndexerError> = input
1653 .tx_hashes
1654 .iter()
1655 .map(|h| Hash::from_base58(h))
1656 .collect();
1657 let nullifiers: Result<Vec<[u8; 32]>, IndexerError> = input
1658 .nullifiers
1659 .iter()
1660 .map(|h| Hash::from_base58(h))
1661 .collect();
1662 let leaves_hash_chains: Result<Vec<[u8; 32]>, IndexerError> = input
1663 .leaves_hash_chains
1664 .iter()
1665 .map(|h| Hash::from_base58(h))
1666 .collect();
1667
1668 Some(super::InputQueueData {
1669 leaf_indices: input.leaf_indices,
1670 account_hashes: account_hashes?,
1671 current_leaves: current_leaves?,
1672 tx_hashes: tx_hashes?,
1673 nullifiers: nullifiers?,
1674 first_queue_index: input.first_queue_index,
1675 leaves_hash_chains: leaves_hash_chains?,
1676 })
1677 } else {
1678 None
1679 };
1680
1681 Some(super::StateQueueData {
1682 nodes,
1683 node_hashes: node_hashes?,
1684 initial_root,
1685 root_seq: state.root_seq,
1686 output_queue,
1687 input_queue,
1688 })
1689 } else {
1690 None
1691 };
1692
1693 let address_queue = if let Some(address) = api_response.address_queue {
1695 let addresses: Result<Vec<[u8; 32]>, IndexerError> = address
1696 .addresses
1697 .iter()
1698 .map(|h| Hash::from_base58(h))
1699 .collect();
1700
1701 let low_element_values: Result<Vec<[u8; 32]>, IndexerError> = address
1702 .low_element_values
1703 .iter()
1704 .map(|h| Hash::from_base58(h))
1705 .collect();
1706
1707 let low_element_next_values: Result<Vec<[u8; 32]>, IndexerError> = address
1708 .low_element_next_values
1709 .iter()
1710 .map(|h| Hash::from_base58(h))
1711 .collect();
1712
1713 let nodes: Vec<u64> = address.nodes.iter().map(|n| n.index).collect();
1716 let node_hashes: Result<Vec<[u8; 32]>, IndexerError> = address
1717 .nodes
1718 .iter()
1719 .map(|n| Hash::from_base58(&n.hash))
1720 .collect();
1721
1722 let initial_root = Hash::from_base58(&address.initial_root)?;
1723
1724 let leaves_hash_chains: Result<Vec<[u8; 32]>, IndexerError> = address
1725 .leaves_hash_chains
1726 .iter()
1727 .map(|h| Hash::from_base58(h))
1728 .collect();
1729
1730 let subtrees: Result<Vec<[u8; 32]>, IndexerError> = address
1731 .subtrees
1732 .iter()
1733 .map(|h| Hash::from_base58(h))
1734 .collect();
1735
1736 Some(super::AddressQueueData {
1737 addresses: addresses?,
1738 low_element_values: low_element_values?,
1739 low_element_next_values: low_element_next_values?,
1740 low_element_indices: address.low_element_indices,
1741 low_element_next_indices: address.low_element_next_indices,
1742 nodes,
1743 node_hashes: node_hashes?,
1744 initial_root,
1745 leaves_hash_chains: leaves_hash_chains?,
1746 subtrees: subtrees?,
1747 start_index: address.start_index,
1748 root_seq: address.root_seq,
1749 })
1750 } else {
1751 None
1752 };
1753
1754 Ok(Response {
1755 context: Context {
1756 slot: api_response.context.slot,
1757 },
1758 value: super::QueueElementsResult {
1759 state_queue,
1760 address_queue,
1761 },
1762 })
1763 })
1764 .await
1765 }
1766 }
1767
1768 async fn get_subtrees(
1769 &self,
1770 _merkle_tree_pubkey: [u8; 32],
1771 _config: Option<IndexerRpcConfig>,
1772 ) -> Result<Response<Items<[u8; 32]>>, IndexerError> {
1773 #[cfg(not(feature = "v2"))]
1774 unimplemented!();
1775 #[cfg(feature = "v2")]
1776 {
1777 todo!();
1778 }
1779 }
1780}