1use std::sync::Arc;
10
11use serde_json::{json, Value};
12use tracing::warn;
13
14use tidepool_rpc::cache::{CacheStore, SearchFilter};
15use tidepool_rpc::cnft::{index_tree, CnftStore, IndexTreeOptions};
16use tidepool_rpc::compat::{manifest, summarize};
17use tidepool_rpc::compatibility::compatibility;
18use tidepool_rpc::das::{
19 get_asset_full, get_asset_proof, get_asset_proof_batch, get_assets_by_authority,
20 get_assets_by_creator, get_assets_by_group, get_assets_by_owner, get_balances,
21 get_nft_editions, get_token_accounts, search_assets, AccountDecoder, TokenAccountsFilter,
22};
23use tidepool_rpc::enhanced::{
24 enrich_token_standards, get_transactions, get_transactions_by_address,
25 get_transactions_for_address, get_transfers_by_address, Direction, Sort,
26 TransactionsByAddressOptions, TransactionsForAddressOptions, TransfersByAddressOptions,
27 TxStatus,
28};
29use tidepool_rpc::priority_fee::{compute_levels, percentile_at, PriorityLevel};
30use tidepool_rpc::upstream::UpstreamClient;
31use tidepool_rpc::webhooks::{PostClient, WebhookError, WebhookInput};
32
33use crate::json_rpc::{codes, fail, ok, JsonRpcRequest};
34use crate::webhook_runtime::WebhookRuntime;
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum Method {
40 GetAsset,
42 GetAssetBatch,
43 GetAssetProof,
44 GetAssetProofBatch,
45 GetAssetsByOwner,
46 GetAssetsByAuthority,
47 GetAssetsByCreator,
48 GetAssetsByGroup,
49 SearchAssets,
50 GetNftEditions,
51 GetTokenAccounts,
52 GetProgramAccountsV2,
54 GetTokenAccountsByOwnerV2,
55 GetTransfersByAddress,
57 GetTransactionsForAddress,
58 GetPriorityFeeEstimate,
68 SendTransactionWithSender,
69 TidepoolInfo,
71 TidepoolIndexTree,
72 TidepoolExportTreeSnapshot,
73 TidepoolLoadTreeSnapshot,
74}
75
76impl Method {
77 #[must_use]
81 pub fn from_wire(name: &str) -> Option<Self> {
82 Some(match name {
83 "getAsset" => Self::GetAsset,
84 "getAssetBatch" => Self::GetAssetBatch,
85 "getAssetProof" => Self::GetAssetProof,
86 "getAssetProofBatch" => Self::GetAssetProofBatch,
87 "getAssetsByOwner" => Self::GetAssetsByOwner,
88 "getAssetsByAuthority" => Self::GetAssetsByAuthority,
89 "getAssetsByCreator" => Self::GetAssetsByCreator,
90 "getAssetsByGroup" => Self::GetAssetsByGroup,
91 "searchAssets" => Self::SearchAssets,
92 "getNftEditions" => Self::GetNftEditions,
93 "getTokenAccounts" => Self::GetTokenAccounts,
94 "getProgramAccountsV2" => Self::GetProgramAccountsV2,
95 "getTokenAccountsByOwnerV2" => Self::GetTokenAccountsByOwnerV2,
96 "getTransfersByAddress" => Self::GetTransfersByAddress,
97 "getTransactionsForAddress" => Self::GetTransactionsForAddress,
98 "getPriorityFeeEstimate" => Self::GetPriorityFeeEstimate,
99 "sendTransactionWithSender" => Self::SendTransactionWithSender,
100 "tidepool_info" => Self::TidepoolInfo,
101 "tidepool_indexTree" => Self::TidepoolIndexTree,
102 "tidepool_exportTreeSnapshot" => Self::TidepoolExportTreeSnapshot,
103 "tidepool_loadTreeSnapshot" => Self::TidepoolLoadTreeSnapshot,
104 _ => return None,
105 })
106 }
107
108 #[must_use]
110 pub fn to_wire(self) -> &'static str {
111 match self {
112 Self::GetAsset => "getAsset",
113 Self::GetAssetBatch => "getAssetBatch",
114 Self::GetAssetProof => "getAssetProof",
115 Self::GetAssetProofBatch => "getAssetProofBatch",
116 Self::GetAssetsByOwner => "getAssetsByOwner",
117 Self::GetAssetsByAuthority => "getAssetsByAuthority",
118 Self::GetAssetsByCreator => "getAssetsByCreator",
119 Self::GetAssetsByGroup => "getAssetsByGroup",
120 Self::SearchAssets => "searchAssets",
121 Self::GetNftEditions => "getNftEditions",
122 Self::GetTokenAccounts => "getTokenAccounts",
123 Self::GetProgramAccountsV2 => "getProgramAccountsV2",
124 Self::GetTokenAccountsByOwnerV2 => "getTokenAccountsByOwnerV2",
125 Self::GetTransfersByAddress => "getTransfersByAddress",
126 Self::GetTransactionsForAddress => "getTransactionsForAddress",
127 Self::GetPriorityFeeEstimate => "getPriorityFeeEstimate",
128 Self::SendTransactionWithSender => "sendTransactionWithSender",
129 Self::TidepoolInfo => "tidepool_info",
130 Self::TidepoolIndexTree => "tidepool_indexTree",
131 Self::TidepoolExportTreeSnapshot => "tidepool_exportTreeSnapshot",
132 Self::TidepoolLoadTreeSnapshot => "tidepool_loadTreeSnapshot",
133 }
134 }
135}
136
137pub struct Ctx<S, C, U>
140where
141 S: CnftStore + ?Sized,
142 C: CacheStore + ?Sized,
143 U: UpstreamClient + ?Sized + 'static,
144{
145 pub cnft: Arc<S>,
146 pub cache: Arc<C>,
147 pub upstream: Arc<U>,
148 pub decoders: Arc<[Arc<dyn AccountDecoder>]>,
149 pub webhooks: Arc<WebhookRuntime<U, dyn PostClient>>,
150}
151
152impl<S, C, U> Clone for Ctx<S, C, U>
153where
154 S: CnftStore + ?Sized,
155 C: CacheStore + ?Sized,
156 U: UpstreamClient + ?Sized + 'static,
157{
158 fn clone(&self) -> Self {
159 Self {
160 cnft: Arc::clone(&self.cnft),
161 cache: Arc::clone(&self.cache),
162 upstream: Arc::clone(&self.upstream),
163 decoders: Arc::clone(&self.decoders),
164 webhooks: Arc::clone(&self.webhooks),
165 }
166 }
167}
168
169pub async fn dispatch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Option<Value>
172where
173 S: CnftStore + ?Sized,
174 C: CacheStore + ?Sized,
175 U: UpstreamClient + ?Sized + 'static,
176{
177 let method = Method::from_wire(&req.method)?;
178 Some(match method {
179 Method::GetAsset => handle_get_asset(ctx, req).await,
180 Method::GetAssetBatch => handle_get_asset_batch(ctx, req).await,
181 Method::GetAssetProof => handle_get_asset_proof(ctx, req).await,
182 Method::GetAssetProofBatch => handle_get_asset_proof_batch(ctx, req).await,
183 Method::GetAssetsByOwner => handle_get_assets_by_owner(ctx, req).await,
184 Method::GetAssetsByAuthority => handle_get_assets_by_authority(ctx, req).await,
185 Method::GetAssetsByCreator => handle_get_assets_by_creator(ctx, req).await,
186 Method::GetAssetsByGroup => handle_get_assets_by_group(ctx, req).await,
187 Method::SearchAssets => handle_search_assets(ctx, req).await,
188 Method::GetNftEditions => handle_get_nft_editions(ctx, req).await,
189 Method::GetTokenAccounts => handle_get_token_accounts(ctx, req).await,
190 Method::GetProgramAccountsV2 => handle_get_program_accounts_v2(ctx, req).await,
196 Method::GetTokenAccountsByOwnerV2 => handle_get_token_accounts_by_owner_v2(ctx, req).await,
197 Method::GetTransfersByAddress => handle_get_transfers_by_address(ctx, req).await,
198 Method::GetTransactionsForAddress => handle_get_transactions_for_address(ctx, req).await,
199 Method::GetPriorityFeeEstimate => handle_get_priority_fee_estimate(ctx, req).await,
200 Method::SendTransactionWithSender => handle_send_transaction_with_sender(ctx, req).await,
201 Method::TidepoolInfo => handle_tidepool_info(ctx, req).await,
202 Method::TidepoolIndexTree => handle_tidepool_index_tree(ctx, req).await,
203 Method::TidepoolExportTreeSnapshot => handle_tidepool_export_tree_snapshot(ctx, req).await,
204 Method::TidepoolLoadTreeSnapshot => handle_tidepool_load_tree_snapshot(ctx, req).await,
205 })
206}
207
208async fn handle_get_asset<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
211where
212 S: CnftStore + ?Sized,
213 C: CacheStore + ?Sized,
214 U: UpstreamClient + ?Sized + 'static,
215{
216 let Some(asset_id) = extract_id_param(&req.params) else {
217 return fail(&req.id, codes::INVALID_PARAMS, "missing `id` param");
218 };
219 match get_asset_full(
220 &*ctx.cnft,
221 &*ctx.cache,
222 &*ctx.upstream,
223 &ctx.decoders,
224 &asset_id,
225 )
226 .await
227 {
228 Ok(Some(asset)) => ok(&req.id, serde_json::to_value(asset).unwrap_or(Value::Null)),
229 Ok(None) => fail(&req.id, codes::INTERNAL_ERROR, "Asset not found"),
230 Err(e) => {
231 warn!(method = "getAsset", err = %e, "handler failed");
232 fail(&req.id, codes::INTERNAL_ERROR, format!("{e}"))
233 }
234 }
235}
236
237async fn handle_get_asset_batch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
238where
239 S: CnftStore + ?Sized,
240 C: CacheStore + ?Sized,
241 U: UpstreamClient + ?Sized + 'static,
242{
243 let Some(ids) = req.params.get("ids").and_then(Value::as_array) else {
244 return fail(&req.id, codes::INVALID_PARAMS, "missing `ids` array");
245 };
246 let ids: Vec<String> = ids
247 .iter()
248 .filter_map(|v| v.as_str().map(String::from))
249 .collect();
250 match tidepool_rpc::das::get_asset_batch(
251 &*ctx.cnft,
252 &*ctx.cache,
253 &*ctx.upstream,
254 &ctx.decoders,
255 &ids,
256 )
257 .await
258 {
259 Ok(results) => ok(
260 &req.id,
261 serde_json::to_value(results).unwrap_or(Value::Null),
262 ),
263 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
264 }
265}
266
267async fn handle_get_asset_proof<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
268where
269 S: CnftStore + ?Sized,
270 C: CacheStore + ?Sized,
271 U: UpstreamClient + ?Sized + 'static,
272{
273 let Some(asset_id) = extract_id_param(&req.params) else {
274 return fail(&req.id, codes::INVALID_PARAMS, "missing `id` param");
275 };
276 let Some(id_bytes) = bs58_to_32(&asset_id) else {
277 return fail(
278 &req.id,
279 codes::INVALID_PARAMS,
280 "`id` is not a valid 32-byte base58 address",
281 );
282 };
283 match get_asset_proof(&*ctx.cnft, &id_bytes).await {
284 Ok(Some(p)) => ok(&req.id, serde_json::to_value(p).unwrap_or(Value::Null)),
285 Ok(None) => fail(
286 &req.id,
287 codes::INTERNAL_ERROR,
288 "Asset not found or tree not indexed",
289 ),
290 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
291 }
292}
293
294async fn handle_get_asset_proof_batch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
295where
296 S: CnftStore + ?Sized,
297 C: CacheStore + ?Sized,
298 U: UpstreamClient + ?Sized + 'static,
299{
300 let Some(ids) = req.params.get("ids").and_then(Value::as_array) else {
301 return fail(&req.id, codes::INVALID_PARAMS, "missing `ids` array");
302 };
303 let id_bytes: Vec<[u8; 32]> = ids
304 .iter()
305 .filter_map(|v| v.as_str())
306 .filter_map(bs58_to_32)
307 .collect();
308 match get_asset_proof_batch(&*ctx.cnft, &id_bytes).await {
309 Ok(results) => {
310 let map: serde_json::Map<String, Value> = ids
311 .iter()
312 .filter_map(|v| v.as_str())
313 .zip(results.into_iter())
314 .map(|(id, proof)| {
315 (
316 id.to_string(),
317 proof.map_or(Value::Null, |p| {
318 serde_json::to_value(p).unwrap_or(Value::Null)
319 }),
320 )
321 })
322 .collect();
323 ok(&req.id, Value::Object(map))
324 }
325 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
326 }
327}
328
329async fn handle_get_assets_by_owner<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
330where
331 S: CnftStore + ?Sized,
332 C: CacheStore + ?Sized,
333 U: UpstreamClient + ?Sized + 'static,
334{
335 let Some(owner) = req.params.get("ownerAddress").and_then(Value::as_str) else {
336 return fail(&req.id, codes::INVALID_PARAMS, "missing `ownerAddress`");
337 };
338 match get_assets_by_owner(&*ctx.cache, owner).await {
339 Ok(items) => ok(&req.id, serde_json::json!({ "items": items })),
340 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
341 }
342}
343
344async fn handle_get_assets_by_authority<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
345where
346 S: CnftStore + ?Sized,
347 C: CacheStore + ?Sized,
348 U: UpstreamClient + ?Sized + 'static,
349{
350 let Some(authority) = req.params.get("authorityAddress").and_then(Value::as_str) else {
351 return fail(&req.id, codes::INVALID_PARAMS, "missing `authorityAddress`");
352 };
353 match get_assets_by_authority(&*ctx.cache, authority).await {
354 Ok(items) => ok(&req.id, json!({ "items": items })),
355 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
356 }
357}
358
359async fn handle_get_assets_by_creator<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
360where
361 S: CnftStore + ?Sized,
362 C: CacheStore + ?Sized,
363 U: UpstreamClient + ?Sized + 'static,
364{
365 let Some(creator) = req.params.get("creatorAddress").and_then(Value::as_str) else {
366 return fail(&req.id, codes::INVALID_PARAMS, "missing `creatorAddress`");
367 };
368 let only_verified = req
369 .params
370 .get("onlyVerified")
371 .and_then(Value::as_bool)
372 .unwrap_or(false);
373 match get_assets_by_creator(&*ctx.cache, creator, only_verified).await {
374 Ok(items) => ok(&req.id, json!({ "items": items })),
375 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
376 }
377}
378
379async fn handle_get_assets_by_group<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
380where
381 S: CnftStore + ?Sized,
382 C: CacheStore + ?Sized,
383 U: UpstreamClient + ?Sized + 'static,
384{
385 let gk = req.params.get("groupKey").and_then(Value::as_str);
386 let gv = req.params.get("groupValue").and_then(Value::as_str);
387 let (Some(gk), Some(gv)) = (gk, gv) else {
388 return fail(
389 &req.id,
390 codes::INVALID_PARAMS,
391 "missing `groupKey` / `groupValue`",
392 );
393 };
394 match get_assets_by_group(&*ctx.cache, gk, gv).await {
395 Ok(items) => ok(&req.id, json!({ "items": items })),
396 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
397 }
398}
399
400async fn handle_search_assets<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
401where
402 S: CnftStore + ?Sized,
403 C: CacheStore + ?Sized,
404 U: UpstreamClient + ?Sized + 'static,
405{
406 let filter = SearchFilter {
407 owner_address: req
408 .params
409 .get("ownerAddress")
410 .and_then(Value::as_str)
411 .map(String::from),
412 authority_address: req
413 .params
414 .get("authorityAddress")
415 .and_then(Value::as_str)
416 .map(String::from),
417 creator_address: req
418 .params
419 .get("creatorAddress")
420 .and_then(Value::as_str)
421 .map(String::from),
422 creator_verified: req.params.get("creatorVerified").and_then(Value::as_bool),
423 grouping: req
424 .params
425 .get("grouping")
426 .and_then(Value::as_array)
427 .and_then(|arr| {
428 let k = arr.first()?.as_str()?.to_string();
429 let v = arr.get(1)?.as_str()?.to_string();
430 Some((k, v))
431 }),
432 interface: req
433 .params
434 .get("interface")
435 .and_then(Value::as_str)
436 .map(String::from),
437 burnt: req.params.get("burnt").and_then(Value::as_bool),
438 };
439 match search_assets(&*ctx.cache, &filter).await {
440 Ok(items) => ok(&req.id, json!({ "items": items })),
441 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
442 }
443}
444
445async fn handle_get_nft_editions<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
450where
451 S: CnftStore + ?Sized,
452 C: CacheStore + ?Sized,
453 U: UpstreamClient + ?Sized + 'static,
454{
455 let mint = req
456 .params
457 .get("mint")
458 .or_else(|| req.params.get("id"))
459 .and_then(Value::as_str);
460 let Some(mint) = mint else {
461 return fail(
462 &req.id,
463 codes::INVALID_PARAMS,
464 "getNftEditions requires `mint`",
465 );
466 };
467 let page = req.params.get("page").and_then(Value::as_u64).unwrap_or(1);
468 let limit = req
470 .params
471 .get("limit")
472 .and_then(Value::as_u64)
473 .unwrap_or(100);
474
475 match get_nft_editions(
476 &*ctx.cache,
477 &*ctx.upstream,
478 &ctx.decoders,
479 mint,
480 page,
481 limit,
482 )
483 .await
484 {
485 Ok(Some(result)) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
486 Ok(None) => ok(&req.id, Value::Null),
487 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
488 }
489}
490
491async fn handle_get_token_accounts<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
496where
497 S: CnftStore + ?Sized,
498 C: CacheStore + ?Sized,
499 U: UpstreamClient + ?Sized + 'static,
500{
501 let owner = req
502 .params
503 .get("owner")
504 .and_then(Value::as_str)
505 .map(String::from);
506 let mint = req
507 .params
508 .get("mint")
509 .and_then(Value::as_str)
510 .map(String::from);
511 let page = req.params.get("page").and_then(Value::as_u64).unwrap_or(1);
512 let limit = req
513 .params
514 .get("limit")
515 .and_then(Value::as_u64)
516 .unwrap_or(100);
517 let show_zero_balance = req
518 .params
519 .pointer("/displayOptions/showZeroBalance")
520 .and_then(Value::as_bool)
521 .unwrap_or(false);
522
523 let filter = TokenAccountsFilter {
524 owner,
525 mint,
526 page,
527 limit,
528 show_zero_balance,
529 };
530
531 match get_token_accounts(&*ctx.upstream, &filter).await {
532 Ok(result) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
533 Err(e) => {
534 let code = match &e {
536 tidepool_rpc::das::DasError::BadRequest(_) => codes::INVALID_PARAMS,
537 _ => codes::INTERNAL_ERROR,
538 };
539 fail(&req.id, code, format!("{e}"))
540 }
541 }
542}
543
544pub(crate) async fn handle_get_balances<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
548where
549 S: CnftStore + ?Sized,
550 C: CacheStore + ?Sized,
551 U: UpstreamClient + ?Sized + 'static,
552{
553 let owner = req.params.get("owner").and_then(Value::as_str).or_else(|| {
555 req.params
556 .as_array()
557 .and_then(|a| a.first())
558 .and_then(Value::as_str)
559 });
560 let Some(owner) = owner else {
561 return fail(
562 &req.id,
563 codes::INVALID_PARAMS,
564 "getBalances requires `owner`",
565 );
566 };
567 match get_balances(&*ctx.upstream, owner).await {
568 Ok(result) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
569 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
570 }
571}
572
573fn parse_webhook_input(params: &Value) -> WebhookInput {
579 let url = params
583 .get("webhookURL")
584 .or_else(|| params.get("webhook_url"))
585 .and_then(Value::as_str)
586 .map(String::from);
587 let addresses = params
588 .get("accountAddresses")
589 .or_else(|| params.get("account_addresses"))
590 .and_then(Value::as_array)
591 .map(|a| {
592 a.iter()
593 .filter_map(|v| v.as_str().map(String::from))
594 .collect()
595 });
596 let transaction_types = params
597 .get("transactionTypes")
598 .or_else(|| params.get("transaction_types"))
599 .and_then(Value::as_array)
600 .map(|a| {
601 a.iter()
602 .filter_map(|v| v.as_str().map(String::from))
603 .collect()
604 })
605 .unwrap_or_default();
606 let txn_status = params
607 .get("txnStatus")
608 .or_else(|| params.get("txn_status"))
609 .and_then(Value::as_str)
610 .map(String::from);
611 let webhook_type = params
612 .get("webhookType")
613 .or_else(|| params.get("webhook_type"))
614 .and_then(Value::as_str)
615 .map(String::from);
616 let auth_header = params
617 .get("authHeader")
618 .or_else(|| params.get("auth_header"))
619 .and_then(Value::as_str)
620 .map(String::from);
621 WebhookInput {
622 webhook_url: url,
623 account_addresses: addresses,
624 transaction_types,
625 txn_status,
626 webhook_type,
627 auth_header,
628 }
629}
630
631fn webhook_error_to_response(id: &Value, e: &WebhookError) -> Value {
632 let code = match e {
633 WebhookError::BadRequest(_) => codes::INVALID_PARAMS,
634 WebhookError::NotFound { .. } => codes::INTERNAL_ERROR,
635 };
636 fail(id, code, format!("{e}"))
637}
638
639pub(crate) async fn handle_create_webhook<S, C, U>(
640 ctx: &Ctx<S, C, U>,
641 req: &JsonRpcRequest,
642) -> Value
643where
644 S: CnftStore + ?Sized,
645 C: CacheStore + ?Sized,
646 U: UpstreamClient + ?Sized + 'static,
647{
648 let input = parse_webhook_input(&req.params);
649 match ctx.webhooks.create(input).await {
650 Ok(wh) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
651 Err(e) => webhook_error_to_response(&req.id, &e),
652 }
653}
654
655pub(crate) async fn handle_get_all_webhooks<S, C, U>(
656 ctx: &Ctx<S, C, U>,
657 req: &JsonRpcRequest,
658) -> Value
659where
660 S: CnftStore + ?Sized,
661 C: CacheStore + ?Sized,
662 U: UpstreamClient + ?Sized + 'static,
663{
664 match ctx.webhooks.list().await {
665 Ok(items) => ok(&req.id, serde_json::to_value(items).unwrap_or(Value::Null)),
666 Err(e) => webhook_error_to_response(&req.id, &e),
667 }
668}
669
670pub(crate) async fn handle_get_webhook_by_id<S, C, U>(
671 ctx: &Ctx<S, C, U>,
672 req: &JsonRpcRequest,
673) -> Value
674where
675 S: CnftStore + ?Sized,
676 C: CacheStore + ?Sized,
677 U: UpstreamClient + ?Sized + 'static,
678{
679 let id = req
680 .params
681 .get("webhookID")
682 .or_else(|| req.params.get("webhook_id"))
683 .or_else(|| req.params.get("id"))
684 .and_then(Value::as_str);
685 let Some(id) = id else {
686 return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
687 };
688 match ctx.webhooks.get(id).await {
689 Ok(Some(wh)) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
690 Ok(None) => ok(&req.id, Value::Null),
691 Err(e) => webhook_error_to_response(&req.id, &e),
692 }
693}
694
695pub(crate) async fn handle_edit_webhook<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
696where
697 S: CnftStore + ?Sized,
698 C: CacheStore + ?Sized,
699 U: UpstreamClient + ?Sized + 'static,
700{
701 let id = req
702 .params
703 .get("webhookID")
704 .or_else(|| req.params.get("webhook_id"))
705 .or_else(|| req.params.get("id"))
706 .and_then(Value::as_str);
707 let Some(id) = id else {
708 return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
709 };
710 let input = parse_webhook_input(&req.params);
711 match ctx.webhooks.edit(id, input).await {
712 Ok(wh) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
713 Err(e) => webhook_error_to_response(&req.id, &e),
714 }
715}
716
717pub(crate) async fn handle_delete_webhook<S, C, U>(
718 ctx: &Ctx<S, C, U>,
719 req: &JsonRpcRequest,
720) -> Value
721where
722 S: CnftStore + ?Sized,
723 C: CacheStore + ?Sized,
724 U: UpstreamClient + ?Sized + 'static,
725{
726 let id = req
727 .params
728 .get("webhookID")
729 .or_else(|| req.params.get("webhook_id"))
730 .or_else(|| req.params.get("id"))
731 .and_then(Value::as_str);
732 let Some(id) = id else {
733 return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
734 };
735 match ctx.webhooks.delete(id).await {
736 Ok(removed) => ok(&req.id, json!({ "deleted": removed })),
737 Err(e) => webhook_error_to_response(&req.id, &e),
738 }
739}
740
741pub(crate) async fn handle_get_transactions<S, C, U>(
746 ctx: &Ctx<S, C, U>,
747 req: &JsonRpcRequest,
748) -> Value
749where
750 S: CnftStore + ?Sized,
751 C: CacheStore + ?Sized,
752 U: UpstreamClient + ?Sized + 'static,
753{
754 let sigs: Vec<String> = req
755 .params
756 .get("signatures")
757 .or_else(|| {
758 req.params.as_array().and_then(|a| a.first())
760 })
761 .and_then(Value::as_array)
762 .map(|arr| {
763 arr.iter()
764 .filter_map(|v| v.as_str().map(String::from))
765 .collect()
766 })
767 .unwrap_or_default();
768 if sigs.is_empty() {
769 return fail(
770 &req.id,
771 codes::INVALID_PARAMS,
772 "getTransactions requires a non-empty `signatures` array",
773 );
774 }
775 let mut out = get_transactions(&*ctx.upstream, &sigs).await;
776 enrich_token_standards(&*ctx.cache, &mut out).await;
780 ok(&req.id, serde_json::to_value(out).unwrap_or(Value::Null))
781}
782
783pub(crate) async fn handle_get_transactions_by_address<S, C, U>(
786 ctx: &Ctx<S, C, U>,
787 req: &JsonRpcRequest,
788) -> Value
789where
790 S: CnftStore + ?Sized,
791 C: CacheStore + ?Sized,
792 U: UpstreamClient + ?Sized + 'static,
793{
794 let Some(address) = req
795 .params
796 .get("address")
797 .and_then(Value::as_str)
798 .map(String::from)
799 else {
800 return fail(
801 &req.id,
802 codes::INVALID_PARAMS,
803 "getTransactionsByAddress requires `address`",
804 );
805 };
806 let options = TransactionsByAddressOptions {
807 before: req
808 .params
809 .get("before")
810 .and_then(Value::as_str)
811 .map(String::from),
812 until: req
813 .params
814 .get("until")
815 .and_then(Value::as_str)
816 .map(String::from),
817 limit: req.params.get("limit").and_then(Value::as_u64),
818 types: req
819 .params
820 .get("type")
821 .and_then(Value::as_str)
822 .map(|s| vec![s.to_string()])
823 .or_else(|| {
824 req.params
825 .get("types")
826 .and_then(Value::as_array)
827 .map(|arr| {
828 arr.iter()
829 .filter_map(|v| v.as_str().map(String::from))
830 .collect()
831 })
832 })
833 .unwrap_or_default(),
834 };
835 let mut out = get_transactions_by_address(&*ctx.upstream, &address, &options).await;
836 enrich_token_standards(&*ctx.cache, &mut out).await;
837 ok(&req.id, serde_json::to_value(out).unwrap_or(Value::Null))
838}
839
840pub(crate) async fn handle_get_transfers_by_address<S, C, U>(
845 ctx: &Ctx<S, C, U>,
846 req: &JsonRpcRequest,
847) -> Value
848where
849 S: CnftStore + ?Sized,
850 C: CacheStore + ?Sized,
851 U: UpstreamClient + ?Sized + 'static,
852{
853 let Some(address) = req
857 .params
858 .get("address")
859 .and_then(Value::as_str)
860 .map(String::from)
861 else {
862 return fail(
863 &req.id,
864 codes::INVALID_PARAMS,
865 "getTransfersByAddress requires `address`",
866 );
867 };
868 let opts = TransfersByAddressOptions {
869 mint: req
870 .params
871 .get("mint")
872 .and_then(Value::as_str)
873 .map(String::from),
874 direction: req
875 .params
876 .get("direction")
877 .and_then(Value::as_str)
878 .and_then(Direction::parse),
879 limit: req.params.get("limit").and_then(Value::as_u64),
880 sort: req
881 .params
882 .get("sort")
883 .and_then(Value::as_str)
884 .map(Sort::parse)
885 .unwrap_or_default(),
886 pagination_token: req
887 .params
888 .get("paginationToken")
889 .and_then(Value::as_str)
890 .map(String::from),
891 };
892 let result = get_transfers_by_address(&*ctx.upstream, &address, &opts).await;
893 ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null))
894}
895
896pub(crate) async fn handle_get_transactions_for_address<S, C, U>(
901 ctx: &Ctx<S, C, U>,
902 req: &JsonRpcRequest,
903) -> Value
904where
905 S: CnftStore + ?Sized,
906 C: CacheStore + ?Sized,
907 U: UpstreamClient + ?Sized + 'static,
908{
909 let Some(address) = req
910 .params
911 .get("address")
912 .and_then(Value::as_str)
913 .map(String::from)
914 else {
915 return fail(
916 &req.id,
917 codes::INVALID_PARAMS,
918 "getTransactionsForAddress requires `address`",
919 );
920 };
921 let opts = TransactionsForAddressOptions {
922 limit: req.params.get("limit").and_then(Value::as_u64),
923 pagination_token: req
924 .params
925 .get("paginationToken")
926 .and_then(Value::as_str)
927 .map(String::from),
928 min_slot: req.params.get("minSlot").and_then(Value::as_u64),
929 max_slot: req.params.get("maxSlot").and_then(Value::as_u64),
930 status: req
931 .params
932 .get("status")
933 .and_then(Value::as_str)
934 .and_then(TxStatus::parse),
935 };
936 let mut result = get_transactions_for_address(&*ctx.upstream, &address, &opts).await;
937 enrich_token_standards(&*ctx.cache, &mut result.data).await;
938 ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null))
939}
940
941async fn handle_get_program_accounts_v2<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
947where
948 S: CnftStore + ?Sized,
949 C: CacheStore + ?Sized,
950 U: UpstreamClient + ?Sized + 'static,
951{
952 let Some(program_id) = req
953 .params
954 .get("programId")
955 .and_then(Value::as_str)
956 .map(String::from)
957 else {
958 return fail(
959 &req.id,
960 codes::INVALID_PARAMS,
961 "getProgramAccountsV2 requires `programId`",
962 );
963 };
964
965 let mut cfg = serde_json::Map::new();
966 for key in [
968 "encoding",
969 "commitment",
970 "filters",
971 "dataSlice",
972 "minContextSlot",
973 ] {
974 if let Some(v) = req.params.get(key) {
975 cfg.insert(key.to_string(), v.clone());
976 }
977 }
978 let params = json!([program_id, Value::Object(cfg)]);
979
980 let raw = match ctx.upstream.rpc_call("getProgramAccounts", params).await {
981 Ok(r) => r,
982 Err(e) => {
983 return fail(
984 &req.id,
985 codes::INTERNAL_ERROR,
986 format!("upstream getProgramAccounts failed: {e}"),
987 );
988 }
989 };
990
991 let cursor = req
992 .params
993 .get("cursor")
994 .and_then(Value::as_str)
995 .map(String::from);
996 let limit = req
997 .params
998 .get("limit")
999 .and_then(Value::as_u64)
1000 .unwrap_or(1000);
1001
1002 let response = build_cursor_page(&raw, cursor.as_deref(), limit);
1003 ok(&req.id, response)
1004}
1005
1006async fn handle_get_token_accounts_by_owner_v2<S, C, U>(
1009 ctx: &Ctx<S, C, U>,
1010 req: &JsonRpcRequest,
1011) -> Value
1012where
1013 S: CnftStore + ?Sized,
1014 C: CacheStore + ?Sized,
1015 U: UpstreamClient + ?Sized + 'static,
1016{
1017 let Some(owner) = req
1018 .params
1019 .get("owner")
1020 .and_then(Value::as_str)
1021 .map(String::from)
1022 else {
1023 return fail(
1024 &req.id,
1025 codes::INVALID_PARAMS,
1026 "getTokenAccountsByOwnerV2 requires `owner`",
1027 );
1028 };
1029
1030 let filter_obj = if let Some(mint) = req.params.get("mint").and_then(Value::as_str) {
1033 json!({ "mint": mint })
1034 } else if let Some(program_id) = req.params.get("programId").and_then(Value::as_str) {
1035 json!({ "programId": program_id })
1036 } else {
1037 return fail(
1038 &req.id,
1039 codes::INVALID_PARAMS,
1040 "getTokenAccountsByOwnerV2 requires `mint` or `programId`",
1041 );
1042 };
1043
1044 let mut cfg = serde_json::Map::new();
1045 for key in ["encoding", "commitment", "minContextSlot"] {
1046 if let Some(v) = req.params.get(key) {
1047 cfg.insert(key.to_string(), v.clone());
1048 }
1049 }
1050 let params = json!([owner, filter_obj, Value::Object(cfg)]);
1051
1052 let raw = match ctx
1053 .upstream
1054 .rpc_call("getTokenAccountsByOwner", params)
1055 .await
1056 {
1057 Ok(r) => r,
1058 Err(e) => {
1059 return fail(
1060 &req.id,
1061 codes::INTERNAL_ERROR,
1062 format!("upstream getTokenAccountsByOwner failed: {e}"),
1063 );
1064 }
1065 };
1066
1067 let cursor = req
1068 .params
1069 .get("cursor")
1070 .and_then(Value::as_str)
1071 .map(String::from);
1072 let limit = req
1073 .params
1074 .get("limit")
1075 .and_then(Value::as_u64)
1076 .unwrap_or(1000);
1077
1078 let response = build_cursor_page(&raw, cursor.as_deref(), limit);
1079 ok(&req.id, response)
1080}
1081
1082fn build_cursor_page(raw: &[u8], cursor: Option<&str>, limit: u64) -> Value {
1087 let parsed: Value = serde_json::from_slice(raw).unwrap_or(Value::Null);
1088 let array = if let Some(inner) = parsed.get("value") {
1089 inner.as_array().cloned().unwrap_or_default()
1090 } else {
1091 parsed.as_array().cloned().unwrap_or_default()
1092 };
1093
1094 let mut sorted = array;
1095 sorted.sort_by(|a, b| {
1096 let ak = a.get("pubkey").and_then(Value::as_str).unwrap_or("");
1097 let bk = b.get("pubkey").and_then(Value::as_str).unwrap_or("");
1098 ak.cmp(bk)
1099 });
1100
1101 let mut filtered: Vec<Value> = if let Some(c) = cursor {
1103 sorted
1104 .into_iter()
1105 .filter(|entry| {
1106 entry
1107 .get("pubkey")
1108 .and_then(Value::as_str)
1109 .is_some_and(|pk| pk > c)
1110 })
1111 .collect()
1112 } else {
1113 sorted
1114 };
1115
1116 let limit_usize = usize::try_from(limit.max(1)).unwrap_or(1000);
1118 let has_more = filtered.len() > limit_usize;
1119 filtered.truncate(limit_usize);
1120 let next_cursor = if has_more {
1121 filtered
1122 .last()
1123 .and_then(|e| e.get("pubkey"))
1124 .and_then(Value::as_str)
1125 .map(String::from)
1126 } else {
1127 None
1128 };
1129
1130 match next_cursor {
1131 Some(c) => json!({ "items": filtered, "cursor": c }),
1132 None => json!({ "items": filtered }),
1133 }
1134}
1135
1136async fn handle_send_transaction_with_sender<S, C, U>(
1148 ctx: &Ctx<S, C, U>,
1149 req: &JsonRpcRequest,
1150) -> Value
1151where
1152 S: CnftStore + ?Sized,
1153 C: CacheStore + ?Sized,
1154 U: UpstreamClient + ?Sized + 'static,
1155{
1156 match ctx
1157 .upstream
1158 .rpc_call("sendTransaction", req.params.clone())
1159 .await
1160 {
1161 Ok(raw) => {
1162 let result: Value = serde_json::from_slice(&raw).unwrap_or(Value::Null);
1163 ok(&req.id, result)
1164 }
1165 Err(e) => fail(
1166 &req.id,
1167 codes::INTERNAL_ERROR,
1168 format!("upstream sendTransaction failed: {e}"),
1169 ),
1170 }
1171}
1172
1173async fn handle_get_priority_fee_estimate<S, C, U>(
1183 ctx: &Ctx<S, C, U>,
1184 req: &JsonRpcRequest,
1185) -> Value
1186where
1187 S: CnftStore + ?Sized,
1188 C: CacheStore + ?Sized,
1189 U: UpstreamClient + ?Sized + 'static,
1190{
1191 let params_obj = match &req.params {
1194 Value::Array(a) => a.first().cloned().unwrap_or(Value::Null),
1195 other => other.clone(),
1196 };
1197 let account_keys: Vec<String> = params_obj
1198 .get("accountKeys")
1199 .and_then(Value::as_array)
1200 .map(|arr| {
1201 arr.iter()
1202 .filter_map(|v| v.as_str().map(String::from))
1203 .collect()
1204 })
1205 .unwrap_or_default();
1206 let options = params_obj.get("options").cloned().unwrap_or(Value::Null);
1207 let include_all = options
1208 .get("includeAllPriorityFeeLevels")
1209 .and_then(Value::as_bool)
1210 .unwrap_or(false);
1211
1212 let upstream_params = if account_keys.is_empty() {
1216 json!([])
1217 } else {
1218 json!([account_keys])
1219 };
1220 let raw = match ctx
1221 .upstream
1222 .rpc_call("getRecentPrioritizationFees", upstream_params)
1223 .await
1224 {
1225 Ok(r) => r,
1226 Err(e) => {
1227 return fail(
1228 &req.id,
1229 codes::INTERNAL_ERROR,
1230 format!("upstream getRecentPrioritizationFees failed: {e}"),
1231 );
1232 }
1233 };
1234 let fees: Vec<u64> = serde_json::from_slice::<Value>(&raw)
1236 .ok()
1237 .and_then(|v| v.as_array().cloned())
1238 .map(|arr| {
1239 arr.iter()
1240 .filter_map(|entry| entry.get("prioritizationFee").and_then(Value::as_u64))
1241 .collect()
1242 })
1243 .unwrap_or_default();
1244
1245 let levels = compute_levels(&fees);
1246
1247 if include_all {
1248 ok(
1249 &req.id,
1250 json!({
1251 "priorityFeeLevels": levels,
1252 }),
1253 )
1254 } else {
1255 let level: PriorityLevel = options
1258 .get("priorityLevel")
1259 .and_then(Value::as_str)
1260 .and_then(|s| match s {
1261 "min" | "Min" => Some(PriorityLevel::Min),
1262 "low" | "Low" => Some(PriorityLevel::Low),
1263 "medium" | "Medium" => Some(PriorityLevel::Medium),
1264 "high" | "High" => Some(PriorityLevel::High),
1265 "veryHigh" | "VeryHigh" => Some(PriorityLevel::VeryHigh),
1266 "unsafeMax" | "UnsafeMax" => Some(PriorityLevel::UnsafeMax),
1267 _ => None,
1268 })
1269 .unwrap_or(PriorityLevel::Medium);
1270 let mut sorted = fees;
1271 sorted.sort_unstable();
1272 let estimate = percentile_at(&sorted, level);
1273 ok(
1274 &req.id,
1275 json!({
1276 "priorityFeeEstimate": estimate,
1277 }),
1278 )
1279 }
1280}
1281
1282#[allow(clippy::unused_async)]
1286async fn handle_tidepool_info<S, C, U>(_ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
1287where
1288 S: CnftStore + ?Sized,
1289 C: CacheStore + ?Sized,
1290 U: UpstreamClient + ?Sized + 'static,
1291{
1292 let methods = manifest();
1293 let summary = summarize(methods);
1294 ok(
1295 &req.id,
1296 json!({
1297 "name": "tidepool",
1298 "version": env!("CARGO_PKG_VERSION"),
1299 "methods": methods,
1300 "summary": summary,
1301 "compatibility": compatibility(),
1305 }),
1306 )
1307}
1308
1309async fn handle_tidepool_index_tree<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
1310where
1311 S: CnftStore + ?Sized,
1312 C: CacheStore + ?Sized,
1313 U: UpstreamClient + ?Sized + 'static,
1314{
1315 let Some(tree_b58) = req.params.get("tree").and_then(Value::as_str) else {
1316 return fail(&req.id, codes::INVALID_PARAMS, "missing `tree` param");
1317 };
1318 let Some(tree_bytes) = bs58_to_32(tree_b58) else {
1319 return fail(
1320 &req.id,
1321 codes::INVALID_PARAMS,
1322 "`tree` is not a valid 32-byte base58 address",
1323 );
1324 };
1325 let opts = IndexTreeOptions::default();
1326 match index_tree(&*ctx.upstream, &*ctx.cnft, tree_bytes, &opts).await {
1327 Ok(result) => ok(
1328 &req.id,
1329 json!({
1330 "tree": tree_b58,
1331 "processed": result.processed,
1332 "applied": result.applied,
1333 "skipped": result.skipped,
1334 }),
1335 ),
1336 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("Index failed: {e}")),
1337 }
1338}
1339
1340async fn handle_tidepool_export_tree_snapshot<S, C, U>(
1348 ctx: &Ctx<S, C, U>,
1349 req: &JsonRpcRequest,
1350) -> Value
1351where
1352 S: CnftStore + ?Sized,
1353 C: CacheStore + ?Sized,
1354 U: UpstreamClient + ?Sized + 'static,
1355{
1356 let Some(tree_b58) = req.params.get("tree").and_then(Value::as_str) else {
1357 return fail(&req.id, codes::INVALID_PARAMS, "missing `tree` param");
1358 };
1359 let Some(tree_bytes) = bs58_to_32(tree_b58) else {
1360 return fail(
1361 &req.id,
1362 codes::INVALID_PARAMS,
1363 "`tree` is not a valid 32-byte base58 address",
1364 );
1365 };
1366 match tidepool_rpc::cnft::dump_tree(&*ctx.cnft, &tree_bytes).await {
1367 Ok(Some(snapshot)) => {
1368 let blob = tidepool_rpc::cnft::SnapshotBlob::from_tree(&snapshot);
1369 ok(
1370 &req.id,
1371 json!({
1372 "tree": tree_b58,
1373 "leafCount": snapshot.leaves.len(),
1374 "lastSignature": snapshot.last_signature,
1375 "snapshot": blob,
1376 }),
1377 )
1378 }
1379 Ok(None) => ok(&req.id, Value::Null),
1380 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("dump failed: {e}")),
1381 }
1382}
1383
1384async fn handle_tidepool_load_tree_snapshot<S, C, U>(
1387 ctx: &Ctx<S, C, U>,
1388 req: &JsonRpcRequest,
1389) -> Value
1390where
1391 S: CnftStore + ?Sized,
1392 C: CacheStore + ?Sized,
1393 U: UpstreamClient + ?Sized + 'static,
1394{
1395 let Some(snapshot_v) = req.params.get("snapshot") else {
1396 return fail(&req.id, codes::INVALID_PARAMS, "missing `snapshot` param");
1397 };
1398 let blob: tidepool_rpc::cnft::SnapshotBlob = match serde_json::from_value(snapshot_v.clone()) {
1399 Ok(b) => b,
1400 Err(e) => {
1401 return fail(
1402 &req.id,
1403 codes::INVALID_PARAMS,
1404 format!("snapshot envelope: {e}"),
1405 )
1406 }
1407 };
1408 let snapshot = match blob.into_tree_snapshot() {
1409 Ok(s) => s,
1410 Err(e) => return fail(&req.id, codes::INVALID_PARAMS, e),
1411 };
1412 match tidepool_rpc::cnft::load_tree(&*ctx.cnft, snapshot).await {
1413 Ok(summary) => ok(
1414 &req.id,
1415 json!({
1416 "tree": bs58::encode(summary.tree).into_string(),
1417 "leafCount": summary.leaf_count,
1418 }),
1419 ),
1420 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("load failed: {e}")),
1421 }
1422}
1423
1424fn extract_id_param(params: &Value) -> Option<String> {
1427 if let Some(id) = params.get("id").and_then(Value::as_str) {
1429 return Some(id.to_string());
1430 }
1431 if let Some(arr) = params.as_array() {
1432 if let Some(id) = arr.first().and_then(Value::as_str) {
1433 return Some(id.to_string());
1434 }
1435 }
1436 None
1437}
1438
1439fn bs58_to_32(s: &str) -> Option<[u8; 32]> {
1440 let v = bs58::decode(s).into_vec().ok()?;
1441 v.try_into().ok()
1442}