1use std::sync::Arc;
10
11use serde_json::{json, Value};
12use tracing::warn;
13
14use tidepool_rpc::cache::{CacheStore, SearchFilter};
15use tidepool_rpc::cnft::{index_tree, CnftStore, IndexTreeOptions};
16use tidepool_rpc::compat::{manifest, summarize};
17use tidepool_rpc::compatibility::compatibility;
18use tidepool_rpc::das::{
19 get_asset_full, get_asset_proof, get_asset_proof_batch, get_assets_by_authority,
20 get_assets_by_creator, get_assets_by_group, get_assets_by_owner, get_balances,
21 get_nft_editions, get_token_accounts, search_assets, AccountDecoder, TokenAccountsFilter,
22};
23use tidepool_rpc::enhanced::{
24 enrich_token_standards, get_transactions, get_transactions_by_address,
25 TransactionsByAddressOptions,
26};
27use tidepool_rpc::priority_fee::{compute_levels, percentile_at, PriorityLevel};
28use tidepool_rpc::upstream::UpstreamClient;
29use tidepool_rpc::webhooks::{PostClient, WebhookError, WebhookInput};
30
31use crate::json_rpc::{codes, fail, ok, JsonRpcRequest};
32use crate::webhook_runtime::WebhookRuntime;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum Method {
38 GetAsset,
40 GetAssetBatch,
41 GetAssetProof,
42 GetAssetProofBatch,
43 GetAssetsByOwner,
44 GetAssetsByAuthority,
45 GetAssetsByCreator,
46 GetAssetsByGroup,
47 SearchAssets,
48 GetNftEditions,
49 GetTokenAccounts,
50 GetProgramAccountsV2,
52 GetTokenAccountsByOwnerV2,
53 GetPriorityFeeEstimate,
62 SendTransactionWithSender,
63 TidepoolInfo,
65 TidepoolIndexTree,
66 TidepoolExportTreeSnapshot,
67 TidepoolLoadTreeSnapshot,
68}
69
70impl Method {
71 #[must_use]
75 pub fn from_wire(name: &str) -> Option<Self> {
76 Some(match name {
77 "getAsset" => Self::GetAsset,
78 "getAssetBatch" => Self::GetAssetBatch,
79 "getAssetProof" => Self::GetAssetProof,
80 "getAssetProofBatch" => Self::GetAssetProofBatch,
81 "getAssetsByOwner" => Self::GetAssetsByOwner,
82 "getAssetsByAuthority" => Self::GetAssetsByAuthority,
83 "getAssetsByCreator" => Self::GetAssetsByCreator,
84 "getAssetsByGroup" => Self::GetAssetsByGroup,
85 "searchAssets" => Self::SearchAssets,
86 "getNftEditions" => Self::GetNftEditions,
87 "getTokenAccounts" => Self::GetTokenAccounts,
88 "getProgramAccountsV2" => Self::GetProgramAccountsV2,
89 "getTokenAccountsByOwnerV2" => Self::GetTokenAccountsByOwnerV2,
90 "getPriorityFeeEstimate" => Self::GetPriorityFeeEstimate,
91 "sendTransactionWithSender" => Self::SendTransactionWithSender,
92 "tidepool_info" => Self::TidepoolInfo,
93 "tidepool_indexTree" => Self::TidepoolIndexTree,
94 "tidepool_exportTreeSnapshot" => Self::TidepoolExportTreeSnapshot,
95 "tidepool_loadTreeSnapshot" => Self::TidepoolLoadTreeSnapshot,
96 _ => return None,
97 })
98 }
99
100 #[must_use]
102 pub fn to_wire(self) -> &'static str {
103 match self {
104 Self::GetAsset => "getAsset",
105 Self::GetAssetBatch => "getAssetBatch",
106 Self::GetAssetProof => "getAssetProof",
107 Self::GetAssetProofBatch => "getAssetProofBatch",
108 Self::GetAssetsByOwner => "getAssetsByOwner",
109 Self::GetAssetsByAuthority => "getAssetsByAuthority",
110 Self::GetAssetsByCreator => "getAssetsByCreator",
111 Self::GetAssetsByGroup => "getAssetsByGroup",
112 Self::SearchAssets => "searchAssets",
113 Self::GetNftEditions => "getNftEditions",
114 Self::GetTokenAccounts => "getTokenAccounts",
115 Self::GetProgramAccountsV2 => "getProgramAccountsV2",
116 Self::GetTokenAccountsByOwnerV2 => "getTokenAccountsByOwnerV2",
117 Self::GetPriorityFeeEstimate => "getPriorityFeeEstimate",
118 Self::SendTransactionWithSender => "sendTransactionWithSender",
119 Self::TidepoolInfo => "tidepool_info",
120 Self::TidepoolIndexTree => "tidepool_indexTree",
121 Self::TidepoolExportTreeSnapshot => "tidepool_exportTreeSnapshot",
122 Self::TidepoolLoadTreeSnapshot => "tidepool_loadTreeSnapshot",
123 }
124 }
125}
126
127pub struct Ctx<S, C, U>
130where
131 S: CnftStore + ?Sized,
132 C: CacheStore + ?Sized,
133 U: UpstreamClient + ?Sized + 'static,
134{
135 pub cnft: Arc<S>,
136 pub cache: Arc<C>,
137 pub upstream: Arc<U>,
138 pub decoders: Arc<[Arc<dyn AccountDecoder>]>,
139 pub webhooks: Arc<WebhookRuntime<U, dyn PostClient>>,
140}
141
142impl<S, C, U> Clone for Ctx<S, C, U>
143where
144 S: CnftStore + ?Sized,
145 C: CacheStore + ?Sized,
146 U: UpstreamClient + ?Sized + 'static,
147{
148 fn clone(&self) -> Self {
149 Self {
150 cnft: Arc::clone(&self.cnft),
151 cache: Arc::clone(&self.cache),
152 upstream: Arc::clone(&self.upstream),
153 decoders: Arc::clone(&self.decoders),
154 webhooks: Arc::clone(&self.webhooks),
155 }
156 }
157}
158
159pub async fn dispatch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Option<Value>
162where
163 S: CnftStore + ?Sized,
164 C: CacheStore + ?Sized,
165 U: UpstreamClient + ?Sized + 'static,
166{
167 let method = Method::from_wire(&req.method)?;
168 Some(match method {
169 Method::GetAsset => handle_get_asset(ctx, req).await,
170 Method::GetAssetBatch => handle_get_asset_batch(ctx, req).await,
171 Method::GetAssetProof => handle_get_asset_proof(ctx, req).await,
172 Method::GetAssetProofBatch => handle_get_asset_proof_batch(ctx, req).await,
173 Method::GetAssetsByOwner => handle_get_assets_by_owner(ctx, req).await,
174 Method::GetAssetsByAuthority => handle_get_assets_by_authority(ctx, req).await,
175 Method::GetAssetsByCreator => handle_get_assets_by_creator(ctx, req).await,
176 Method::GetAssetsByGroup => handle_get_assets_by_group(ctx, req).await,
177 Method::SearchAssets => handle_search_assets(ctx, req).await,
178 Method::GetNftEditions => handle_get_nft_editions(ctx, req).await,
179 Method::GetTokenAccounts => handle_get_token_accounts(ctx, req).await,
180 Method::GetProgramAccountsV2 => handle_get_program_accounts_v2(ctx, req).await,
186 Method::GetTokenAccountsByOwnerV2 => handle_get_token_accounts_by_owner_v2(ctx, req).await,
187 Method::GetPriorityFeeEstimate => handle_get_priority_fee_estimate(ctx, req).await,
188 Method::SendTransactionWithSender => handle_send_transaction_with_sender(ctx, req).await,
189 Method::TidepoolInfo => handle_tidepool_info(ctx, req).await,
190 Method::TidepoolIndexTree => handle_tidepool_index_tree(ctx, req).await,
191 Method::TidepoolExportTreeSnapshot => handle_tidepool_export_tree_snapshot(ctx, req).await,
192 Method::TidepoolLoadTreeSnapshot => handle_tidepool_load_tree_snapshot(ctx, req).await,
193 })
194}
195
196async fn handle_get_asset<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
199where
200 S: CnftStore + ?Sized,
201 C: CacheStore + ?Sized,
202 U: UpstreamClient + ?Sized + 'static,
203{
204 let Some(asset_id) = extract_id_param(&req.params) else {
205 return fail(&req.id, codes::INVALID_PARAMS, "missing `id` param");
206 };
207 match get_asset_full(
208 &*ctx.cnft,
209 &*ctx.cache,
210 &*ctx.upstream,
211 &ctx.decoders,
212 &asset_id,
213 )
214 .await
215 {
216 Ok(Some(asset)) => ok(&req.id, serde_json::to_value(asset).unwrap_or(Value::Null)),
217 Ok(None) => fail(&req.id, codes::INTERNAL_ERROR, "Asset not found"),
218 Err(e) => {
219 warn!(method = "getAsset", err = %e, "handler failed");
220 fail(&req.id, codes::INTERNAL_ERROR, format!("{e}"))
221 }
222 }
223}
224
225async fn handle_get_asset_batch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
226where
227 S: CnftStore + ?Sized,
228 C: CacheStore + ?Sized,
229 U: UpstreamClient + ?Sized + 'static,
230{
231 let Some(ids) = req.params.get("ids").and_then(Value::as_array) else {
232 return fail(&req.id, codes::INVALID_PARAMS, "missing `ids` array");
233 };
234 let ids: Vec<String> = ids
235 .iter()
236 .filter_map(|v| v.as_str().map(String::from))
237 .collect();
238 match tidepool_rpc::das::get_asset_batch(
239 &*ctx.cnft,
240 &*ctx.cache,
241 &*ctx.upstream,
242 &ctx.decoders,
243 &ids,
244 )
245 .await
246 {
247 Ok(results) => ok(
248 &req.id,
249 serde_json::to_value(results).unwrap_or(Value::Null),
250 ),
251 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
252 }
253}
254
255async fn handle_get_asset_proof<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
256where
257 S: CnftStore + ?Sized,
258 C: CacheStore + ?Sized,
259 U: UpstreamClient + ?Sized + 'static,
260{
261 let Some(asset_id) = extract_id_param(&req.params) else {
262 return fail(&req.id, codes::INVALID_PARAMS, "missing `id` param");
263 };
264 let Some(id_bytes) = bs58_to_32(&asset_id) else {
265 return fail(
266 &req.id,
267 codes::INVALID_PARAMS,
268 "`id` is not a valid 32-byte base58 address",
269 );
270 };
271 match get_asset_proof(&*ctx.cnft, &id_bytes).await {
272 Ok(Some(p)) => ok(&req.id, serde_json::to_value(p).unwrap_or(Value::Null)),
273 Ok(None) => fail(
274 &req.id,
275 codes::INTERNAL_ERROR,
276 "Asset not found or tree not indexed",
277 ),
278 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
279 }
280}
281
282async fn handle_get_asset_proof_batch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
283where
284 S: CnftStore + ?Sized,
285 C: CacheStore + ?Sized,
286 U: UpstreamClient + ?Sized + 'static,
287{
288 let Some(ids) = req.params.get("ids").and_then(Value::as_array) else {
289 return fail(&req.id, codes::INVALID_PARAMS, "missing `ids` array");
290 };
291 let id_bytes: Vec<[u8; 32]> = ids
292 .iter()
293 .filter_map(|v| v.as_str())
294 .filter_map(bs58_to_32)
295 .collect();
296 match get_asset_proof_batch(&*ctx.cnft, &id_bytes).await {
297 Ok(results) => {
298 let map: serde_json::Map<String, Value> = ids
299 .iter()
300 .filter_map(|v| v.as_str())
301 .zip(results.into_iter())
302 .map(|(id, proof)| {
303 (
304 id.to_string(),
305 proof.map_or(Value::Null, |p| {
306 serde_json::to_value(p).unwrap_or(Value::Null)
307 }),
308 )
309 })
310 .collect();
311 ok(&req.id, Value::Object(map))
312 }
313 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
314 }
315}
316
317async fn handle_get_assets_by_owner<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
318where
319 S: CnftStore + ?Sized,
320 C: CacheStore + ?Sized,
321 U: UpstreamClient + ?Sized + 'static,
322{
323 let Some(owner) = req.params.get("ownerAddress").and_then(Value::as_str) else {
324 return fail(&req.id, codes::INVALID_PARAMS, "missing `ownerAddress`");
325 };
326 match get_assets_by_owner(&*ctx.cache, owner).await {
327 Ok(items) => ok(&req.id, serde_json::json!({ "items": items })),
328 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
329 }
330}
331
332async fn handle_get_assets_by_authority<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
333where
334 S: CnftStore + ?Sized,
335 C: CacheStore + ?Sized,
336 U: UpstreamClient + ?Sized + 'static,
337{
338 let Some(authority) = req.params.get("authorityAddress").and_then(Value::as_str) else {
339 return fail(&req.id, codes::INVALID_PARAMS, "missing `authorityAddress`");
340 };
341 match get_assets_by_authority(&*ctx.cache, authority).await {
342 Ok(items) => ok(&req.id, json!({ "items": items })),
343 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
344 }
345}
346
347async fn handle_get_assets_by_creator<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
348where
349 S: CnftStore + ?Sized,
350 C: CacheStore + ?Sized,
351 U: UpstreamClient + ?Sized + 'static,
352{
353 let Some(creator) = req.params.get("creatorAddress").and_then(Value::as_str) else {
354 return fail(&req.id, codes::INVALID_PARAMS, "missing `creatorAddress`");
355 };
356 let only_verified = req
357 .params
358 .get("onlyVerified")
359 .and_then(Value::as_bool)
360 .unwrap_or(false);
361 match get_assets_by_creator(&*ctx.cache, creator, only_verified).await {
362 Ok(items) => ok(&req.id, json!({ "items": items })),
363 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
364 }
365}
366
367async fn handle_get_assets_by_group<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
368where
369 S: CnftStore + ?Sized,
370 C: CacheStore + ?Sized,
371 U: UpstreamClient + ?Sized + 'static,
372{
373 let gk = req.params.get("groupKey").and_then(Value::as_str);
374 let gv = req.params.get("groupValue").and_then(Value::as_str);
375 let (Some(gk), Some(gv)) = (gk, gv) else {
376 return fail(
377 &req.id,
378 codes::INVALID_PARAMS,
379 "missing `groupKey` / `groupValue`",
380 );
381 };
382 match get_assets_by_group(&*ctx.cache, gk, gv).await {
383 Ok(items) => ok(&req.id, json!({ "items": items })),
384 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
385 }
386}
387
388async fn handle_search_assets<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
389where
390 S: CnftStore + ?Sized,
391 C: CacheStore + ?Sized,
392 U: UpstreamClient + ?Sized + 'static,
393{
394 let filter = SearchFilter {
395 owner_address: req
396 .params
397 .get("ownerAddress")
398 .and_then(Value::as_str)
399 .map(String::from),
400 authority_address: req
401 .params
402 .get("authorityAddress")
403 .and_then(Value::as_str)
404 .map(String::from),
405 creator_address: req
406 .params
407 .get("creatorAddress")
408 .and_then(Value::as_str)
409 .map(String::from),
410 creator_verified: req.params.get("creatorVerified").and_then(Value::as_bool),
411 grouping: req
412 .params
413 .get("grouping")
414 .and_then(Value::as_array)
415 .and_then(|arr| {
416 let k = arr.first()?.as_str()?.to_string();
417 let v = arr.get(1)?.as_str()?.to_string();
418 Some((k, v))
419 }),
420 interface: req
421 .params
422 .get("interface")
423 .and_then(Value::as_str)
424 .map(String::from),
425 burnt: req.params.get("burnt").and_then(Value::as_bool),
426 };
427 match search_assets(&*ctx.cache, &filter).await {
428 Ok(items) => ok(&req.id, json!({ "items": items })),
429 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
430 }
431}
432
433async fn handle_get_nft_editions<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
438where
439 S: CnftStore + ?Sized,
440 C: CacheStore + ?Sized,
441 U: UpstreamClient + ?Sized + 'static,
442{
443 let mint = req
444 .params
445 .get("mint")
446 .or_else(|| req.params.get("id"))
447 .and_then(Value::as_str);
448 let Some(mint) = mint else {
449 return fail(
450 &req.id,
451 codes::INVALID_PARAMS,
452 "getNftEditions requires `mint`",
453 );
454 };
455 let page = req.params.get("page").and_then(Value::as_u64).unwrap_or(1);
456 let limit = req
458 .params
459 .get("limit")
460 .and_then(Value::as_u64)
461 .unwrap_or(100);
462
463 match get_nft_editions(
464 &*ctx.cache,
465 &*ctx.upstream,
466 &ctx.decoders,
467 mint,
468 page,
469 limit,
470 )
471 .await
472 {
473 Ok(Some(result)) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
474 Ok(None) => ok(&req.id, Value::Null),
475 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
476 }
477}
478
479async fn handle_get_token_accounts<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
484where
485 S: CnftStore + ?Sized,
486 C: CacheStore + ?Sized,
487 U: UpstreamClient + ?Sized + 'static,
488{
489 let owner = req
490 .params
491 .get("owner")
492 .and_then(Value::as_str)
493 .map(String::from);
494 let mint = req
495 .params
496 .get("mint")
497 .and_then(Value::as_str)
498 .map(String::from);
499 let page = req.params.get("page").and_then(Value::as_u64).unwrap_or(1);
500 let limit = req
501 .params
502 .get("limit")
503 .and_then(Value::as_u64)
504 .unwrap_or(100);
505 let show_zero_balance = req
506 .params
507 .pointer("/displayOptions/showZeroBalance")
508 .and_then(Value::as_bool)
509 .unwrap_or(false);
510
511 let filter = TokenAccountsFilter {
512 owner,
513 mint,
514 page,
515 limit,
516 show_zero_balance,
517 };
518
519 match get_token_accounts(&*ctx.upstream, &filter).await {
520 Ok(result) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
521 Err(e) => {
522 let code = match &e {
524 tidepool_rpc::das::DasError::BadRequest(_) => codes::INVALID_PARAMS,
525 _ => codes::INTERNAL_ERROR,
526 };
527 fail(&req.id, code, format!("{e}"))
528 }
529 }
530}
531
532pub(crate) async fn handle_get_balances<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
536where
537 S: CnftStore + ?Sized,
538 C: CacheStore + ?Sized,
539 U: UpstreamClient + ?Sized + 'static,
540{
541 let owner = req.params.get("owner").and_then(Value::as_str).or_else(|| {
543 req.params
544 .as_array()
545 .and_then(|a| a.first())
546 .and_then(Value::as_str)
547 });
548 let Some(owner) = owner else {
549 return fail(
550 &req.id,
551 codes::INVALID_PARAMS,
552 "getBalances requires `owner`",
553 );
554 };
555 match get_balances(&*ctx.upstream, owner).await {
556 Ok(result) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
557 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
558 }
559}
560
561fn parse_webhook_input(params: &Value) -> WebhookInput {
567 let url = params
571 .get("webhookURL")
572 .or_else(|| params.get("webhook_url"))
573 .and_then(Value::as_str)
574 .map(String::from);
575 let addresses = params
576 .get("accountAddresses")
577 .or_else(|| params.get("account_addresses"))
578 .and_then(Value::as_array)
579 .map(|a| {
580 a.iter()
581 .filter_map(|v| v.as_str().map(String::from))
582 .collect()
583 });
584 let transaction_types = params
585 .get("transactionTypes")
586 .or_else(|| params.get("transaction_types"))
587 .and_then(Value::as_array)
588 .map(|a| {
589 a.iter()
590 .filter_map(|v| v.as_str().map(String::from))
591 .collect()
592 })
593 .unwrap_or_default();
594 let txn_status = params
595 .get("txnStatus")
596 .or_else(|| params.get("txn_status"))
597 .and_then(Value::as_str)
598 .map(String::from);
599 let webhook_type = params
600 .get("webhookType")
601 .or_else(|| params.get("webhook_type"))
602 .and_then(Value::as_str)
603 .map(String::from);
604 let auth_header = params
605 .get("authHeader")
606 .or_else(|| params.get("auth_header"))
607 .and_then(Value::as_str)
608 .map(String::from);
609 WebhookInput {
610 webhook_url: url,
611 account_addresses: addresses,
612 transaction_types,
613 txn_status,
614 webhook_type,
615 auth_header,
616 }
617}
618
619fn webhook_error_to_response(id: &Value, e: &WebhookError) -> Value {
620 let code = match e {
621 WebhookError::BadRequest(_) => codes::INVALID_PARAMS,
622 WebhookError::NotFound { .. } => codes::INTERNAL_ERROR,
623 };
624 fail(id, code, format!("{e}"))
625}
626
627pub(crate) async fn handle_create_webhook<S, C, U>(
628 ctx: &Ctx<S, C, U>,
629 req: &JsonRpcRequest,
630) -> Value
631where
632 S: CnftStore + ?Sized,
633 C: CacheStore + ?Sized,
634 U: UpstreamClient + ?Sized + 'static,
635{
636 let input = parse_webhook_input(&req.params);
637 match ctx.webhooks.create(input).await {
638 Ok(wh) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
639 Err(e) => webhook_error_to_response(&req.id, &e),
640 }
641}
642
643pub(crate) async fn handle_get_all_webhooks<S, C, U>(
644 ctx: &Ctx<S, C, U>,
645 req: &JsonRpcRequest,
646) -> Value
647where
648 S: CnftStore + ?Sized,
649 C: CacheStore + ?Sized,
650 U: UpstreamClient + ?Sized + 'static,
651{
652 match ctx.webhooks.list().await {
653 Ok(items) => ok(&req.id, serde_json::to_value(items).unwrap_or(Value::Null)),
654 Err(e) => webhook_error_to_response(&req.id, &e),
655 }
656}
657
658pub(crate) async fn handle_get_webhook_by_id<S, C, U>(
659 ctx: &Ctx<S, C, U>,
660 req: &JsonRpcRequest,
661) -> Value
662where
663 S: CnftStore + ?Sized,
664 C: CacheStore + ?Sized,
665 U: UpstreamClient + ?Sized + 'static,
666{
667 let id = req
668 .params
669 .get("webhookID")
670 .or_else(|| req.params.get("webhook_id"))
671 .or_else(|| req.params.get("id"))
672 .and_then(Value::as_str);
673 let Some(id) = id else {
674 return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
675 };
676 match ctx.webhooks.get(id).await {
677 Ok(Some(wh)) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
678 Ok(None) => ok(&req.id, Value::Null),
679 Err(e) => webhook_error_to_response(&req.id, &e),
680 }
681}
682
683pub(crate) async fn handle_edit_webhook<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
684where
685 S: CnftStore + ?Sized,
686 C: CacheStore + ?Sized,
687 U: UpstreamClient + ?Sized + 'static,
688{
689 let id = req
690 .params
691 .get("webhookID")
692 .or_else(|| req.params.get("webhook_id"))
693 .or_else(|| req.params.get("id"))
694 .and_then(Value::as_str);
695 let Some(id) = id else {
696 return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
697 };
698 let input = parse_webhook_input(&req.params);
699 match ctx.webhooks.edit(id, input).await {
700 Ok(wh) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
701 Err(e) => webhook_error_to_response(&req.id, &e),
702 }
703}
704
705pub(crate) async fn handle_delete_webhook<S, C, U>(
706 ctx: &Ctx<S, C, U>,
707 req: &JsonRpcRequest,
708) -> Value
709where
710 S: CnftStore + ?Sized,
711 C: CacheStore + ?Sized,
712 U: UpstreamClient + ?Sized + 'static,
713{
714 let id = req
715 .params
716 .get("webhookID")
717 .or_else(|| req.params.get("webhook_id"))
718 .or_else(|| req.params.get("id"))
719 .and_then(Value::as_str);
720 let Some(id) = id else {
721 return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
722 };
723 match ctx.webhooks.delete(id).await {
724 Ok(removed) => ok(&req.id, json!({ "deleted": removed })),
725 Err(e) => webhook_error_to_response(&req.id, &e),
726 }
727}
728
729pub(crate) async fn handle_get_transactions<S, C, U>(
734 ctx: &Ctx<S, C, U>,
735 req: &JsonRpcRequest,
736) -> Value
737where
738 S: CnftStore + ?Sized,
739 C: CacheStore + ?Sized,
740 U: UpstreamClient + ?Sized + 'static,
741{
742 let sigs: Vec<String> = req
743 .params
744 .get("signatures")
745 .or_else(|| {
746 req.params.as_array().and_then(|a| a.first())
748 })
749 .and_then(Value::as_array)
750 .map(|arr| {
751 arr.iter()
752 .filter_map(|v| v.as_str().map(String::from))
753 .collect()
754 })
755 .unwrap_or_default();
756 if sigs.is_empty() {
757 return fail(
758 &req.id,
759 codes::INVALID_PARAMS,
760 "getTransactions requires a non-empty `signatures` array",
761 );
762 }
763 let mut out = get_transactions(&*ctx.upstream, &sigs).await;
764 enrich_token_standards(&*ctx.cache, &mut out).await;
768 ok(&req.id, serde_json::to_value(out).unwrap_or(Value::Null))
769}
770
771pub(crate) async fn handle_get_transactions_by_address<S, C, U>(
774 ctx: &Ctx<S, C, U>,
775 req: &JsonRpcRequest,
776) -> Value
777where
778 S: CnftStore + ?Sized,
779 C: CacheStore + ?Sized,
780 U: UpstreamClient + ?Sized + 'static,
781{
782 let Some(address) = req
783 .params
784 .get("address")
785 .and_then(Value::as_str)
786 .map(String::from)
787 else {
788 return fail(
789 &req.id,
790 codes::INVALID_PARAMS,
791 "getTransactionsByAddress requires `address`",
792 );
793 };
794 let options = TransactionsByAddressOptions {
795 before: req
796 .params
797 .get("before")
798 .and_then(Value::as_str)
799 .map(String::from),
800 until: req
801 .params
802 .get("until")
803 .and_then(Value::as_str)
804 .map(String::from),
805 limit: req.params.get("limit").and_then(Value::as_u64),
806 types: req
807 .params
808 .get("type")
809 .and_then(Value::as_str)
810 .map(|s| vec![s.to_string()])
811 .or_else(|| {
812 req.params
813 .get("types")
814 .and_then(Value::as_array)
815 .map(|arr| {
816 arr.iter()
817 .filter_map(|v| v.as_str().map(String::from))
818 .collect()
819 })
820 })
821 .unwrap_or_default(),
822 };
823 let mut out = get_transactions_by_address(&*ctx.upstream, &address, &options).await;
824 enrich_token_standards(&*ctx.cache, &mut out).await;
825 ok(&req.id, serde_json::to_value(out).unwrap_or(Value::Null))
826}
827
828async fn handle_get_program_accounts_v2<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
834where
835 S: CnftStore + ?Sized,
836 C: CacheStore + ?Sized,
837 U: UpstreamClient + ?Sized + 'static,
838{
839 let Some(program_id) = req
840 .params
841 .get("programId")
842 .and_then(Value::as_str)
843 .map(String::from)
844 else {
845 return fail(
846 &req.id,
847 codes::INVALID_PARAMS,
848 "getProgramAccountsV2 requires `programId`",
849 );
850 };
851
852 let mut cfg = serde_json::Map::new();
853 for key in [
855 "encoding",
856 "commitment",
857 "filters",
858 "dataSlice",
859 "minContextSlot",
860 ] {
861 if let Some(v) = req.params.get(key) {
862 cfg.insert(key.to_string(), v.clone());
863 }
864 }
865 let params = json!([program_id, Value::Object(cfg)]);
866
867 let raw = match ctx.upstream.rpc_call("getProgramAccounts", params).await {
868 Ok(r) => r,
869 Err(e) => {
870 return fail(
871 &req.id,
872 codes::INTERNAL_ERROR,
873 format!("upstream getProgramAccounts failed: {e}"),
874 );
875 }
876 };
877
878 let cursor = req
879 .params
880 .get("cursor")
881 .and_then(Value::as_str)
882 .map(String::from);
883 let limit = req
884 .params
885 .get("limit")
886 .and_then(Value::as_u64)
887 .unwrap_or(1000);
888
889 let response = build_cursor_page(&raw, cursor.as_deref(), limit);
890 ok(&req.id, response)
891}
892
893async fn handle_get_token_accounts_by_owner_v2<S, C, U>(
896 ctx: &Ctx<S, C, U>,
897 req: &JsonRpcRequest,
898) -> Value
899where
900 S: CnftStore + ?Sized,
901 C: CacheStore + ?Sized,
902 U: UpstreamClient + ?Sized + 'static,
903{
904 let Some(owner) = req
905 .params
906 .get("owner")
907 .and_then(Value::as_str)
908 .map(String::from)
909 else {
910 return fail(
911 &req.id,
912 codes::INVALID_PARAMS,
913 "getTokenAccountsByOwnerV2 requires `owner`",
914 );
915 };
916
917 let filter_obj = if let Some(mint) = req.params.get("mint").and_then(Value::as_str) {
920 json!({ "mint": mint })
921 } else if let Some(program_id) = req.params.get("programId").and_then(Value::as_str) {
922 json!({ "programId": program_id })
923 } else {
924 return fail(
925 &req.id,
926 codes::INVALID_PARAMS,
927 "getTokenAccountsByOwnerV2 requires `mint` or `programId`",
928 );
929 };
930
931 let mut cfg = serde_json::Map::new();
932 for key in ["encoding", "commitment", "minContextSlot"] {
933 if let Some(v) = req.params.get(key) {
934 cfg.insert(key.to_string(), v.clone());
935 }
936 }
937 let params = json!([owner, filter_obj, Value::Object(cfg)]);
938
939 let raw = match ctx
940 .upstream
941 .rpc_call("getTokenAccountsByOwner", params)
942 .await
943 {
944 Ok(r) => r,
945 Err(e) => {
946 return fail(
947 &req.id,
948 codes::INTERNAL_ERROR,
949 format!("upstream getTokenAccountsByOwner failed: {e}"),
950 );
951 }
952 };
953
954 let cursor = req
955 .params
956 .get("cursor")
957 .and_then(Value::as_str)
958 .map(String::from);
959 let limit = req
960 .params
961 .get("limit")
962 .and_then(Value::as_u64)
963 .unwrap_or(1000);
964
965 let response = build_cursor_page(&raw, cursor.as_deref(), limit);
966 ok(&req.id, response)
967}
968
969fn build_cursor_page(raw: &[u8], cursor: Option<&str>, limit: u64) -> Value {
974 let parsed: Value = serde_json::from_slice(raw).unwrap_or(Value::Null);
975 let array = if let Some(inner) = parsed.get("value") {
976 inner.as_array().cloned().unwrap_or_default()
977 } else {
978 parsed.as_array().cloned().unwrap_or_default()
979 };
980
981 let mut sorted = array;
982 sorted.sort_by(|a, b| {
983 let ak = a.get("pubkey").and_then(Value::as_str).unwrap_or("");
984 let bk = b.get("pubkey").and_then(Value::as_str).unwrap_or("");
985 ak.cmp(bk)
986 });
987
988 let mut filtered: Vec<Value> = if let Some(c) = cursor {
990 sorted
991 .into_iter()
992 .filter(|entry| {
993 entry
994 .get("pubkey")
995 .and_then(Value::as_str)
996 .is_some_and(|pk| pk > c)
997 })
998 .collect()
999 } else {
1000 sorted
1001 };
1002
1003 let limit_usize = usize::try_from(limit.max(1)).unwrap_or(1000);
1005 let has_more = filtered.len() > limit_usize;
1006 filtered.truncate(limit_usize);
1007 let next_cursor = if has_more {
1008 filtered
1009 .last()
1010 .and_then(|e| e.get("pubkey"))
1011 .and_then(Value::as_str)
1012 .map(String::from)
1013 } else {
1014 None
1015 };
1016
1017 match next_cursor {
1018 Some(c) => json!({ "items": filtered, "cursor": c }),
1019 None => json!({ "items": filtered }),
1020 }
1021}
1022
1023async fn handle_send_transaction_with_sender<S, C, U>(
1035 ctx: &Ctx<S, C, U>,
1036 req: &JsonRpcRequest,
1037) -> Value
1038where
1039 S: CnftStore + ?Sized,
1040 C: CacheStore + ?Sized,
1041 U: UpstreamClient + ?Sized + 'static,
1042{
1043 match ctx
1044 .upstream
1045 .rpc_call("sendTransaction", req.params.clone())
1046 .await
1047 {
1048 Ok(raw) => {
1049 let result: Value = serde_json::from_slice(&raw).unwrap_or(Value::Null);
1050 ok(&req.id, result)
1051 }
1052 Err(e) => fail(
1053 &req.id,
1054 codes::INTERNAL_ERROR,
1055 format!("upstream sendTransaction failed: {e}"),
1056 ),
1057 }
1058}
1059
1060async fn handle_get_priority_fee_estimate<S, C, U>(
1070 ctx: &Ctx<S, C, U>,
1071 req: &JsonRpcRequest,
1072) -> Value
1073where
1074 S: CnftStore + ?Sized,
1075 C: CacheStore + ?Sized,
1076 U: UpstreamClient + ?Sized + 'static,
1077{
1078 let params_obj = match &req.params {
1081 Value::Array(a) => a.first().cloned().unwrap_or(Value::Null),
1082 other => other.clone(),
1083 };
1084 let account_keys: Vec<String> = params_obj
1085 .get("accountKeys")
1086 .and_then(Value::as_array)
1087 .map(|arr| {
1088 arr.iter()
1089 .filter_map(|v| v.as_str().map(String::from))
1090 .collect()
1091 })
1092 .unwrap_or_default();
1093 let options = params_obj.get("options").cloned().unwrap_or(Value::Null);
1094 let include_all = options
1095 .get("includeAllPriorityFeeLevels")
1096 .and_then(Value::as_bool)
1097 .unwrap_or(false);
1098
1099 let upstream_params = if account_keys.is_empty() {
1103 json!([])
1104 } else {
1105 json!([account_keys])
1106 };
1107 let raw = match ctx
1108 .upstream
1109 .rpc_call("getRecentPrioritizationFees", upstream_params)
1110 .await
1111 {
1112 Ok(r) => r,
1113 Err(e) => {
1114 return fail(
1115 &req.id,
1116 codes::INTERNAL_ERROR,
1117 format!("upstream getRecentPrioritizationFees failed: {e}"),
1118 );
1119 }
1120 };
1121 let fees: Vec<u64> = serde_json::from_slice::<Value>(&raw)
1123 .ok()
1124 .and_then(|v| v.as_array().cloned())
1125 .map(|arr| {
1126 arr.iter()
1127 .filter_map(|entry| entry.get("prioritizationFee").and_then(Value::as_u64))
1128 .collect()
1129 })
1130 .unwrap_or_default();
1131
1132 let levels = compute_levels(&fees);
1133
1134 if include_all {
1135 ok(
1136 &req.id,
1137 json!({
1138 "priorityFeeLevels": levels,
1139 }),
1140 )
1141 } else {
1142 let level: PriorityLevel = options
1145 .get("priorityLevel")
1146 .and_then(Value::as_str)
1147 .and_then(|s| match s {
1148 "min" | "Min" => Some(PriorityLevel::Min),
1149 "low" | "Low" => Some(PriorityLevel::Low),
1150 "medium" | "Medium" => Some(PriorityLevel::Medium),
1151 "high" | "High" => Some(PriorityLevel::High),
1152 "veryHigh" | "VeryHigh" => Some(PriorityLevel::VeryHigh),
1153 "unsafeMax" | "UnsafeMax" => Some(PriorityLevel::UnsafeMax),
1154 _ => None,
1155 })
1156 .unwrap_or(PriorityLevel::Medium);
1157 let mut sorted = fees;
1158 sorted.sort_unstable();
1159 let estimate = percentile_at(&sorted, level);
1160 ok(
1161 &req.id,
1162 json!({
1163 "priorityFeeEstimate": estimate,
1164 }),
1165 )
1166 }
1167}
1168
1169#[allow(clippy::unused_async)]
1173async fn handle_tidepool_info<S, C, U>(_ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
1174where
1175 S: CnftStore + ?Sized,
1176 C: CacheStore + ?Sized,
1177 U: UpstreamClient + ?Sized + 'static,
1178{
1179 let methods = manifest();
1180 let summary = summarize(methods);
1181 ok(
1182 &req.id,
1183 json!({
1184 "name": "tidepool",
1185 "version": env!("CARGO_PKG_VERSION"),
1186 "methods": methods,
1187 "summary": summary,
1188 "compatibility": compatibility(),
1192 }),
1193 )
1194}
1195
1196async fn handle_tidepool_index_tree<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
1197where
1198 S: CnftStore + ?Sized,
1199 C: CacheStore + ?Sized,
1200 U: UpstreamClient + ?Sized + 'static,
1201{
1202 let Some(tree_b58) = req.params.get("tree").and_then(Value::as_str) else {
1203 return fail(&req.id, codes::INVALID_PARAMS, "missing `tree` param");
1204 };
1205 let Some(tree_bytes) = bs58_to_32(tree_b58) else {
1206 return fail(
1207 &req.id,
1208 codes::INVALID_PARAMS,
1209 "`tree` is not a valid 32-byte base58 address",
1210 );
1211 };
1212 let opts = IndexTreeOptions::default();
1213 match index_tree(&*ctx.upstream, &*ctx.cnft, tree_bytes, &opts).await {
1214 Ok(result) => ok(
1215 &req.id,
1216 json!({
1217 "tree": tree_b58,
1218 "processed": result.processed,
1219 "applied": result.applied,
1220 "skipped": result.skipped,
1221 }),
1222 ),
1223 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("Index failed: {e}")),
1224 }
1225}
1226
1227async fn handle_tidepool_export_tree_snapshot<S, C, U>(
1235 ctx: &Ctx<S, C, U>,
1236 req: &JsonRpcRequest,
1237) -> Value
1238where
1239 S: CnftStore + ?Sized,
1240 C: CacheStore + ?Sized,
1241 U: UpstreamClient + ?Sized + 'static,
1242{
1243 let Some(tree_b58) = req.params.get("tree").and_then(Value::as_str) else {
1244 return fail(&req.id, codes::INVALID_PARAMS, "missing `tree` param");
1245 };
1246 let Some(tree_bytes) = bs58_to_32(tree_b58) else {
1247 return fail(
1248 &req.id,
1249 codes::INVALID_PARAMS,
1250 "`tree` is not a valid 32-byte base58 address",
1251 );
1252 };
1253 match tidepool_rpc::cnft::dump_tree(&*ctx.cnft, &tree_bytes).await {
1254 Ok(Some(snapshot)) => {
1255 let blob = tidepool_rpc::cnft::SnapshotBlob::from_tree(&snapshot);
1256 ok(
1257 &req.id,
1258 json!({
1259 "tree": tree_b58,
1260 "leafCount": snapshot.leaves.len(),
1261 "lastSignature": snapshot.last_signature,
1262 "snapshot": blob,
1263 }),
1264 )
1265 }
1266 Ok(None) => ok(&req.id, Value::Null),
1267 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("dump failed: {e}")),
1268 }
1269}
1270
1271async fn handle_tidepool_load_tree_snapshot<S, C, U>(
1274 ctx: &Ctx<S, C, U>,
1275 req: &JsonRpcRequest,
1276) -> Value
1277where
1278 S: CnftStore + ?Sized,
1279 C: CacheStore + ?Sized,
1280 U: UpstreamClient + ?Sized + 'static,
1281{
1282 let Some(snapshot_v) = req.params.get("snapshot") else {
1283 return fail(&req.id, codes::INVALID_PARAMS, "missing `snapshot` param");
1284 };
1285 let blob: tidepool_rpc::cnft::SnapshotBlob = match serde_json::from_value(snapshot_v.clone()) {
1286 Ok(b) => b,
1287 Err(e) => {
1288 return fail(
1289 &req.id,
1290 codes::INVALID_PARAMS,
1291 format!("snapshot envelope: {e}"),
1292 )
1293 }
1294 };
1295 let snapshot = match blob.into_tree_snapshot() {
1296 Ok(s) => s,
1297 Err(e) => return fail(&req.id, codes::INVALID_PARAMS, e),
1298 };
1299 match tidepool_rpc::cnft::load_tree(&*ctx.cnft, snapshot).await {
1300 Ok(summary) => ok(
1301 &req.id,
1302 json!({
1303 "tree": bs58::encode(summary.tree).into_string(),
1304 "leafCount": summary.leaf_count,
1305 }),
1306 ),
1307 Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("load failed: {e}")),
1308 }
1309}
1310
1311fn extract_id_param(params: &Value) -> Option<String> {
1314 if let Some(id) = params.get("id").and_then(Value::as_str) {
1316 return Some(id.to_string());
1317 }
1318 if let Some(arr) = params.as_array() {
1319 if let Some(id) = arr.first().and_then(Value::as_str) {
1320 return Some(id.to_string());
1321 }
1322 }
1323 None
1324}
1325
1326fn bs58_to_32(s: &str) -> Option<[u8; 32]> {
1327 let v = bs58::decode(s).into_vec().ok()?;
1328 v.try_into().ok()
1329}