Skip to main content

tidepool_server/
dispatcher.rs

1//! JSON-RPC method dispatch. The core architectural win: every
2//! method we serve maps to a [`Method`] enum variant, and the dispatch
3//! function does an **exhaustive** match. Adding a new method = add
4//! a variant + a match arm; compiler fails the build if you forget.
5//!
6//! Methods we don't recognize fall through to `Method::Passthrough`
7//! which forwards the raw JSON-RPC envelope to the upstream.
8
9use std::sync::Arc;
10
11use serde_json::{json, Value};
12use tracing::warn;
13
14use tidepool_rpc::cache::{CacheStore, SearchFilter};
15use tidepool_rpc::cnft::{index_tree, CnftStore, IndexTreeOptions};
16use tidepool_rpc::compat::{manifest, summarize};
17use tidepool_rpc::compatibility::compatibility;
18use tidepool_rpc::das::{
19    get_asset_full, get_asset_proof, get_asset_proof_batch, get_assets_by_authority,
20    get_assets_by_creator, get_assets_by_group, get_assets_by_owner, get_balances,
21    get_nft_editions, get_token_accounts, search_assets, AccountDecoder, TokenAccountsFilter,
22};
23use tidepool_rpc::enhanced::{
24    enrich_token_standards, get_transactions, get_transactions_by_address,
25    get_transactions_for_address, get_transfers_by_address, Direction, Sort,
26    TransactionsByAddressOptions, TransactionsForAddressOptions, TransfersByAddressOptions,
27    TxStatus,
28};
29use tidepool_rpc::priority_fee::{compute_levels, percentile_at, PriorityLevel};
30use tidepool_rpc::upstream::UpstreamClient;
31use tidepool_rpc::webhooks::{PostClient, WebhookError, WebhookInput};
32
33use crate::json_rpc::{codes, fail, ok, JsonRpcRequest};
34use crate::webhook_runtime::WebhookRuntime;
35
36/// Every method the server handles natively. Anything not listed here
37/// is forwarded to the upstream unchanged.
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
39pub enum Method {
40    // DAS
41    GetAsset,
42    GetAssetBatch,
43    GetAssetProof,
44    GetAssetProofBatch,
45    GetAssetsByOwner,
46    GetAssetsByAuthority,
47    GetAssetsByCreator,
48    GetAssetsByGroup,
49    SearchAssets,
50    GetNftEditions,
51    GetTokenAccounts,
52    // Helius v2 (cursor-paginated)
53    GetProgramAccountsV2,
54    GetTokenAccountsByOwnerV2,
55    // Helius Historical APIs (JSON-RPC, Apr–May 2026)
56    GetTransfersByAddress,
57    GetTransactionsForAddress,
58    // NOTE: getBalances, createWebhook/getAllWebhooks/... ,
59    // getTransactions, and getTransactionsByAddress (the older REST
60    // variant — superseded by getTransactionsForAddress above) are
61    // deliberately absent from this enum. Helius serves them via REST
62    // (`api.helius.xyz/v0/...`), not JSON-RPC. Serving them here
63    // would let users write local code that'd fail against real
64    // Helius. They live in `crate::rest` instead, routed to the
65    // same handler functions (`pub(crate) handle_*`).
66    // Tx (Helius-custom)
67    GetPriorityFeeEstimate,
68    SendTransactionWithSender,
69    // Tidepool custom
70    TidepoolInfo,
71    TidepoolIndexTree,
72    TidepoolExportTreeSnapshot,
73    TidepoolLoadTreeSnapshot,
74}
75
76impl Method {
77    /// Try to parse a wire method name. Returns `None` for
78    /// methods the server doesn't own — caller falls through to the
79    /// passthrough path.
80    #[must_use]
81    pub fn from_wire(name: &str) -> Option<Self> {
82        Some(match name {
83            "getAsset" => Self::GetAsset,
84            "getAssetBatch" => Self::GetAssetBatch,
85            "getAssetProof" => Self::GetAssetProof,
86            "getAssetProofBatch" => Self::GetAssetProofBatch,
87            "getAssetsByOwner" => Self::GetAssetsByOwner,
88            "getAssetsByAuthority" => Self::GetAssetsByAuthority,
89            "getAssetsByCreator" => Self::GetAssetsByCreator,
90            "getAssetsByGroup" => Self::GetAssetsByGroup,
91            "searchAssets" => Self::SearchAssets,
92            "getNftEditions" => Self::GetNftEditions,
93            "getTokenAccounts" => Self::GetTokenAccounts,
94            "getProgramAccountsV2" => Self::GetProgramAccountsV2,
95            "getTokenAccountsByOwnerV2" => Self::GetTokenAccountsByOwnerV2,
96            "getTransfersByAddress" => Self::GetTransfersByAddress,
97            "getTransactionsForAddress" => Self::GetTransactionsForAddress,
98            "getPriorityFeeEstimate" => Self::GetPriorityFeeEstimate,
99            "sendTransactionWithSender" => Self::SendTransactionWithSender,
100            "tidepool_info" => Self::TidepoolInfo,
101            "tidepool_indexTree" => Self::TidepoolIndexTree,
102            "tidepool_exportTreeSnapshot" => Self::TidepoolExportTreeSnapshot,
103            "tidepool_loadTreeSnapshot" => Self::TidepoolLoadTreeSnapshot,
104            _ => return None,
105        })
106    }
107
108    /// Reverse — useful for the compat manifest.
109    #[must_use]
110    pub fn to_wire(self) -> &'static str {
111        match self {
112            Self::GetAsset => "getAsset",
113            Self::GetAssetBatch => "getAssetBatch",
114            Self::GetAssetProof => "getAssetProof",
115            Self::GetAssetProofBatch => "getAssetProofBatch",
116            Self::GetAssetsByOwner => "getAssetsByOwner",
117            Self::GetAssetsByAuthority => "getAssetsByAuthority",
118            Self::GetAssetsByCreator => "getAssetsByCreator",
119            Self::GetAssetsByGroup => "getAssetsByGroup",
120            Self::SearchAssets => "searchAssets",
121            Self::GetNftEditions => "getNftEditions",
122            Self::GetTokenAccounts => "getTokenAccounts",
123            Self::GetProgramAccountsV2 => "getProgramAccountsV2",
124            Self::GetTokenAccountsByOwnerV2 => "getTokenAccountsByOwnerV2",
125            Self::GetTransfersByAddress => "getTransfersByAddress",
126            Self::GetTransactionsForAddress => "getTransactionsForAddress",
127            Self::GetPriorityFeeEstimate => "getPriorityFeeEstimate",
128            Self::SendTransactionWithSender => "sendTransactionWithSender",
129            Self::TidepoolInfo => "tidepool_info",
130            Self::TidepoolIndexTree => "tidepool_indexTree",
131            Self::TidepoolExportTreeSnapshot => "tidepool_exportTreeSnapshot",
132            Self::TidepoolLoadTreeSnapshot => "tidepool_loadTreeSnapshot",
133        }
134    }
135}
136
137/// Shared request-handling context. Wired once at server start and
138/// passed to every dispatch call.
139pub struct Ctx<S, C, U>
140where
141    S: CnftStore + ?Sized,
142    C: CacheStore + ?Sized,
143    U: UpstreamClient + ?Sized + 'static,
144{
145    pub cnft: Arc<S>,
146    pub cache: Arc<C>,
147    pub upstream: Arc<U>,
148    pub decoders: Arc<[Arc<dyn AccountDecoder>]>,
149    pub webhooks: Arc<WebhookRuntime<U, dyn PostClient>>,
150}
151
152impl<S, C, U> Clone for Ctx<S, C, U>
153where
154    S: CnftStore + ?Sized,
155    C: CacheStore + ?Sized,
156    U: UpstreamClient + ?Sized + 'static,
157{
158    fn clone(&self) -> Self {
159        Self {
160            cnft: Arc::clone(&self.cnft),
161            cache: Arc::clone(&self.cache),
162            upstream: Arc::clone(&self.upstream),
163            decoders: Arc::clone(&self.decoders),
164            webhooks: Arc::clone(&self.webhooks),
165        }
166    }
167}
168
169/// Dispatch one JSON-RPC request. Returns `Some(response)` when we
170/// handled it natively, `None` when the caller should passthrough.
171pub async fn dispatch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Option<Value>
172where
173    S: CnftStore + ?Sized,
174    C: CacheStore + ?Sized,
175    U: UpstreamClient + ?Sized + 'static,
176{
177    let method = Method::from_wire(&req.method)?;
178    Some(match method {
179        Method::GetAsset => handle_get_asset(ctx, req).await,
180        Method::GetAssetBatch => handle_get_asset_batch(ctx, req).await,
181        Method::GetAssetProof => handle_get_asset_proof(ctx, req).await,
182        Method::GetAssetProofBatch => handle_get_asset_proof_batch(ctx, req).await,
183        Method::GetAssetsByOwner => handle_get_assets_by_owner(ctx, req).await,
184        Method::GetAssetsByAuthority => handle_get_assets_by_authority(ctx, req).await,
185        Method::GetAssetsByCreator => handle_get_assets_by_creator(ctx, req).await,
186        Method::GetAssetsByGroup => handle_get_assets_by_group(ctx, req).await,
187        Method::SearchAssets => handle_search_assets(ctx, req).await,
188        Method::GetNftEditions => handle_get_nft_editions(ctx, req).await,
189        Method::GetTokenAccounts => handle_get_token_accounts(ctx, req).await,
190        // NOTE: getBalances, webhook CRUD, and Enhanced Transactions
191        // are reachable only via the REST router (`crate::rest`) —
192        // Helius serves them on `api.helius.xyz/v0/...`, not JSON-RPC.
193        // The handler functions still live in this file but aren't
194        // wired to any `Method` variant here.
195        Method::GetProgramAccountsV2 => handle_get_program_accounts_v2(ctx, req).await,
196        Method::GetTokenAccountsByOwnerV2 => handle_get_token_accounts_by_owner_v2(ctx, req).await,
197        Method::GetTransfersByAddress => handle_get_transfers_by_address(ctx, req).await,
198        Method::GetTransactionsForAddress => handle_get_transactions_for_address(ctx, req).await,
199        Method::GetPriorityFeeEstimate => handle_get_priority_fee_estimate(ctx, req).await,
200        Method::SendTransactionWithSender => handle_send_transaction_with_sender(ctx, req).await,
201        Method::TidepoolInfo => handle_tidepool_info(ctx, req).await,
202        Method::TidepoolIndexTree => handle_tidepool_index_tree(ctx, req).await,
203        Method::TidepoolExportTreeSnapshot => handle_tidepool_export_tree_snapshot(ctx, req).await,
204        Method::TidepoolLoadTreeSnapshot => handle_tidepool_load_tree_snapshot(ctx, req).await,
205    })
206}
207
208// ─── per-method handlers ──────────────────────────────────────────
209
210async fn handle_get_asset<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
211where
212    S: CnftStore + ?Sized,
213    C: CacheStore + ?Sized,
214    U: UpstreamClient + ?Sized + 'static,
215{
216    let Some(asset_id) = extract_id_param(&req.params) else {
217        return fail(&req.id, codes::INVALID_PARAMS, "missing `id` param");
218    };
219    match get_asset_full(
220        &*ctx.cnft,
221        &*ctx.cache,
222        &*ctx.upstream,
223        &ctx.decoders,
224        &asset_id,
225    )
226    .await
227    {
228        Ok(Some(asset)) => ok(&req.id, serde_json::to_value(asset).unwrap_or(Value::Null)),
229        Ok(None) => fail(&req.id, codes::INTERNAL_ERROR, "Asset not found"),
230        Err(e) => {
231            warn!(method = "getAsset", err = %e, "handler failed");
232            fail(&req.id, codes::INTERNAL_ERROR, format!("{e}"))
233        }
234    }
235}
236
237async fn handle_get_asset_batch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
238where
239    S: CnftStore + ?Sized,
240    C: CacheStore + ?Sized,
241    U: UpstreamClient + ?Sized + 'static,
242{
243    let Some(ids) = req.params.get("ids").and_then(Value::as_array) else {
244        return fail(&req.id, codes::INVALID_PARAMS, "missing `ids` array");
245    };
246    let ids: Vec<String> = ids
247        .iter()
248        .filter_map(|v| v.as_str().map(String::from))
249        .collect();
250    match tidepool_rpc::das::get_asset_batch(
251        &*ctx.cnft,
252        &*ctx.cache,
253        &*ctx.upstream,
254        &ctx.decoders,
255        &ids,
256    )
257    .await
258    {
259        Ok(results) => ok(
260            &req.id,
261            serde_json::to_value(results).unwrap_or(Value::Null),
262        ),
263        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
264    }
265}
266
267async fn handle_get_asset_proof<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
268where
269    S: CnftStore + ?Sized,
270    C: CacheStore + ?Sized,
271    U: UpstreamClient + ?Sized + 'static,
272{
273    let Some(asset_id) = extract_id_param(&req.params) else {
274        return fail(&req.id, codes::INVALID_PARAMS, "missing `id` param");
275    };
276    let Some(id_bytes) = bs58_to_32(&asset_id) else {
277        return fail(
278            &req.id,
279            codes::INVALID_PARAMS,
280            "`id` is not a valid 32-byte base58 address",
281        );
282    };
283    match get_asset_proof(&*ctx.cnft, &id_bytes).await {
284        Ok(Some(p)) => ok(&req.id, serde_json::to_value(p).unwrap_or(Value::Null)),
285        Ok(None) => fail(
286            &req.id,
287            codes::INTERNAL_ERROR,
288            "Asset not found or tree not indexed",
289        ),
290        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
291    }
292}
293
294async fn handle_get_asset_proof_batch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
295where
296    S: CnftStore + ?Sized,
297    C: CacheStore + ?Sized,
298    U: UpstreamClient + ?Sized + 'static,
299{
300    let Some(ids) = req.params.get("ids").and_then(Value::as_array) else {
301        return fail(&req.id, codes::INVALID_PARAMS, "missing `ids` array");
302    };
303    let id_bytes: Vec<[u8; 32]> = ids
304        .iter()
305        .filter_map(|v| v.as_str())
306        .filter_map(bs58_to_32)
307        .collect();
308    match get_asset_proof_batch(&*ctx.cnft, &id_bytes).await {
309        Ok(results) => {
310            let map: serde_json::Map<String, Value> = ids
311                .iter()
312                .filter_map(|v| v.as_str())
313                .zip(results.into_iter())
314                .map(|(id, proof)| {
315                    (
316                        id.to_string(),
317                        proof.map_or(Value::Null, |p| {
318                            serde_json::to_value(p).unwrap_or(Value::Null)
319                        }),
320                    )
321                })
322                .collect();
323            ok(&req.id, Value::Object(map))
324        }
325        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
326    }
327}
328
329async fn handle_get_assets_by_owner<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
330where
331    S: CnftStore + ?Sized,
332    C: CacheStore + ?Sized,
333    U: UpstreamClient + ?Sized + 'static,
334{
335    let Some(owner) = req.params.get("ownerAddress").and_then(Value::as_str) else {
336        return fail(&req.id, codes::INVALID_PARAMS, "missing `ownerAddress`");
337    };
338    match get_assets_by_owner(&*ctx.cache, owner).await {
339        Ok(items) => ok(&req.id, serde_json::json!({ "items": items })),
340        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
341    }
342}
343
344async fn handle_get_assets_by_authority<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
345where
346    S: CnftStore + ?Sized,
347    C: CacheStore + ?Sized,
348    U: UpstreamClient + ?Sized + 'static,
349{
350    let Some(authority) = req.params.get("authorityAddress").and_then(Value::as_str) else {
351        return fail(&req.id, codes::INVALID_PARAMS, "missing `authorityAddress`");
352    };
353    match get_assets_by_authority(&*ctx.cache, authority).await {
354        Ok(items) => ok(&req.id, json!({ "items": items })),
355        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
356    }
357}
358
359async fn handle_get_assets_by_creator<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
360where
361    S: CnftStore + ?Sized,
362    C: CacheStore + ?Sized,
363    U: UpstreamClient + ?Sized + 'static,
364{
365    let Some(creator) = req.params.get("creatorAddress").and_then(Value::as_str) else {
366        return fail(&req.id, codes::INVALID_PARAMS, "missing `creatorAddress`");
367    };
368    let only_verified = req
369        .params
370        .get("onlyVerified")
371        .and_then(Value::as_bool)
372        .unwrap_or(false);
373    match get_assets_by_creator(&*ctx.cache, creator, only_verified).await {
374        Ok(items) => ok(&req.id, json!({ "items": items })),
375        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
376    }
377}
378
379async fn handle_get_assets_by_group<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
380where
381    S: CnftStore + ?Sized,
382    C: CacheStore + ?Sized,
383    U: UpstreamClient + ?Sized + 'static,
384{
385    let gk = req.params.get("groupKey").and_then(Value::as_str);
386    let gv = req.params.get("groupValue").and_then(Value::as_str);
387    let (Some(gk), Some(gv)) = (gk, gv) else {
388        return fail(
389            &req.id,
390            codes::INVALID_PARAMS,
391            "missing `groupKey` / `groupValue`",
392        );
393    };
394    match get_assets_by_group(&*ctx.cache, gk, gv).await {
395        Ok(items) => ok(&req.id, json!({ "items": items })),
396        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
397    }
398}
399
400async fn handle_search_assets<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
401where
402    S: CnftStore + ?Sized,
403    C: CacheStore + ?Sized,
404    U: UpstreamClient + ?Sized + 'static,
405{
406    let filter = SearchFilter {
407        owner_address: req
408            .params
409            .get("ownerAddress")
410            .and_then(Value::as_str)
411            .map(String::from),
412        authority_address: req
413            .params
414            .get("authorityAddress")
415            .and_then(Value::as_str)
416            .map(String::from),
417        creator_address: req
418            .params
419            .get("creatorAddress")
420            .and_then(Value::as_str)
421            .map(String::from),
422        creator_verified: req.params.get("creatorVerified").and_then(Value::as_bool),
423        grouping: req
424            .params
425            .get("grouping")
426            .and_then(Value::as_array)
427            .and_then(|arr| {
428                let k = arr.first()?.as_str()?.to_string();
429                let v = arr.get(1)?.as_str()?.to_string();
430                Some((k, v))
431            }),
432        interface: req
433            .params
434            .get("interface")
435            .and_then(Value::as_str)
436            .map(String::from),
437        burnt: req.params.get("burnt").and_then(Value::as_bool),
438    };
439    match search_assets(&*ctx.cache, &filter).await {
440        Ok(items) => ok(&req.id, json!({ "items": items })),
441        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
442    }
443}
444
445/// `helius.das.getNftEditions(mint, page, limit)` — serves from the
446/// local edition index populated on `getAsset` fetches. Cold-path
447/// calls do one upstream fetch of the master mint to warm the index;
448/// subsequent calls serve from cache.
449async fn handle_get_nft_editions<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
450where
451    S: CnftStore + ?Sized,
452    C: CacheStore + ?Sized,
453    U: UpstreamClient + ?Sized + 'static,
454{
455    let mint = req
456        .params
457        .get("mint")
458        .or_else(|| req.params.get("id"))
459        .and_then(Value::as_str);
460    let Some(mint) = mint else {
461        return fail(
462            &req.id,
463            codes::INVALID_PARAMS,
464            "getNftEditions requires `mint`",
465        );
466    };
467    let page = req.params.get("page").and_then(Value::as_u64).unwrap_or(1);
468    // Helius's default page size is 100.
469    let limit = req
470        .params
471        .get("limit")
472        .and_then(Value::as_u64)
473        .unwrap_or(100);
474
475    match get_nft_editions(
476        &*ctx.cache,
477        &*ctx.upstream,
478        &ctx.decoders,
479        mint,
480        page,
481        limit,
482    )
483    .await
484    {
485        Ok(Some(result)) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
486        Ok(None) => ok(&req.id, Value::Null),
487        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
488    }
489}
490
491/// `helius.das.getTokenAccounts(owner?, mint?, page?, limit?,
492/// displayOptions.showZeroBalance?)`. Shim — forwards to the upstream
493/// RPC (`getTokenAccountsByOwner` or `getProgramAccounts` memcmp),
494/// reshapes the response, and paginates locally.
495async fn handle_get_token_accounts<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
496where
497    S: CnftStore + ?Sized,
498    C: CacheStore + ?Sized,
499    U: UpstreamClient + ?Sized + 'static,
500{
501    let owner = req
502        .params
503        .get("owner")
504        .and_then(Value::as_str)
505        .map(String::from);
506    let mint = req
507        .params
508        .get("mint")
509        .and_then(Value::as_str)
510        .map(String::from);
511    let page = req.params.get("page").and_then(Value::as_u64).unwrap_or(1);
512    let limit = req
513        .params
514        .get("limit")
515        .and_then(Value::as_u64)
516        .unwrap_or(100);
517    let show_zero_balance = req
518        .params
519        .pointer("/displayOptions/showZeroBalance")
520        .and_then(Value::as_bool)
521        .unwrap_or(false);
522
523    let filter = TokenAccountsFilter {
524        owner,
525        mint,
526        page,
527        limit,
528        show_zero_balance,
529    };
530
531    match get_token_accounts(&*ctx.upstream, &filter).await {
532        Ok(result) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
533        Err(e) => {
534            // BadRequest → invalid-params code; everything else is internal.
535            let code = match &e {
536                tidepool_rpc::das::DasError::BadRequest(_) => codes::INVALID_PARAMS,
537                _ => codes::INTERNAL_ERROR,
538            };
539            fail(&req.id, code, format!("{e}"))
540        }
541    }
542}
543
544/// `helius.wallet.getBalances(owner)` — returns native SOL + all
545/// SPL/Token-2022 positions the wallet holds. Shim — fans out to
546/// `getBalance` + one `getTokenAccountsByOwner` per program.
547pub(crate) async fn handle_get_balances<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
548where
549    S: CnftStore + ?Sized,
550    C: CacheStore + ?Sized,
551    U: UpstreamClient + ?Sized + 'static,
552{
553    // Helius accepts either `owner` or positional `[owner]`.
554    let owner = req.params.get("owner").and_then(Value::as_str).or_else(|| {
555        req.params
556            .as_array()
557            .and_then(|a| a.first())
558            .and_then(Value::as_str)
559    });
560    let Some(owner) = owner else {
561        return fail(
562            &req.id,
563            codes::INVALID_PARAMS,
564            "getBalances requires `owner`",
565        );
566    };
567    match get_balances(&*ctx.upstream, owner).await {
568        Ok(result) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
569        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
570    }
571}
572
573// ─── Webhook CRUD ──────────────────────────────────────────────────
574// All five handlers front the `WebhookRuntime` on the Ctx — creation
575// spawns a per-webhook polling task; deletion aborts it; edit
576// restarts the task with the new config.
577
578fn parse_webhook_input(params: &Value) -> WebhookInput {
579    // Accept both Helius's camelCase (`webhookURL`, `accountAddresses`,
580    // `transactionTypes`, `txnStatus`, `webhookType`, `authHeader`)
581    // wire keys and our snake_case serde defaults.
582    let url = params
583        .get("webhookURL")
584        .or_else(|| params.get("webhook_url"))
585        .and_then(Value::as_str)
586        .map(String::from);
587    let addresses = params
588        .get("accountAddresses")
589        .or_else(|| params.get("account_addresses"))
590        .and_then(Value::as_array)
591        .map(|a| {
592            a.iter()
593                .filter_map(|v| v.as_str().map(String::from))
594                .collect()
595        });
596    let transaction_types = params
597        .get("transactionTypes")
598        .or_else(|| params.get("transaction_types"))
599        .and_then(Value::as_array)
600        .map(|a| {
601            a.iter()
602                .filter_map(|v| v.as_str().map(String::from))
603                .collect()
604        })
605        .unwrap_or_default();
606    let txn_status = params
607        .get("txnStatus")
608        .or_else(|| params.get("txn_status"))
609        .and_then(Value::as_str)
610        .map(String::from);
611    let webhook_type = params
612        .get("webhookType")
613        .or_else(|| params.get("webhook_type"))
614        .and_then(Value::as_str)
615        .map(String::from);
616    let auth_header = params
617        .get("authHeader")
618        .or_else(|| params.get("auth_header"))
619        .and_then(Value::as_str)
620        .map(String::from);
621    WebhookInput {
622        webhook_url: url,
623        account_addresses: addresses,
624        transaction_types,
625        txn_status,
626        webhook_type,
627        auth_header,
628    }
629}
630
631fn webhook_error_to_response(id: &Value, e: &WebhookError) -> Value {
632    let code = match e {
633        WebhookError::BadRequest(_) => codes::INVALID_PARAMS,
634        WebhookError::NotFound { .. } => codes::INTERNAL_ERROR,
635    };
636    fail(id, code, format!("{e}"))
637}
638
639pub(crate) async fn handle_create_webhook<S, C, U>(
640    ctx: &Ctx<S, C, U>,
641    req: &JsonRpcRequest,
642) -> Value
643where
644    S: CnftStore + ?Sized,
645    C: CacheStore + ?Sized,
646    U: UpstreamClient + ?Sized + 'static,
647{
648    let input = parse_webhook_input(&req.params);
649    match ctx.webhooks.create(input).await {
650        Ok(wh) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
651        Err(e) => webhook_error_to_response(&req.id, &e),
652    }
653}
654
655pub(crate) async fn handle_get_all_webhooks<S, C, U>(
656    ctx: &Ctx<S, C, U>,
657    req: &JsonRpcRequest,
658) -> Value
659where
660    S: CnftStore + ?Sized,
661    C: CacheStore + ?Sized,
662    U: UpstreamClient + ?Sized + 'static,
663{
664    match ctx.webhooks.list().await {
665        Ok(items) => ok(&req.id, serde_json::to_value(items).unwrap_or(Value::Null)),
666        Err(e) => webhook_error_to_response(&req.id, &e),
667    }
668}
669
670pub(crate) async fn handle_get_webhook_by_id<S, C, U>(
671    ctx: &Ctx<S, C, U>,
672    req: &JsonRpcRequest,
673) -> Value
674where
675    S: CnftStore + ?Sized,
676    C: CacheStore + ?Sized,
677    U: UpstreamClient + ?Sized + 'static,
678{
679    let id = req
680        .params
681        .get("webhookID")
682        .or_else(|| req.params.get("webhook_id"))
683        .or_else(|| req.params.get("id"))
684        .and_then(Value::as_str);
685    let Some(id) = id else {
686        return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
687    };
688    match ctx.webhooks.get(id).await {
689        Ok(Some(wh)) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
690        Ok(None) => ok(&req.id, Value::Null),
691        Err(e) => webhook_error_to_response(&req.id, &e),
692    }
693}
694
695pub(crate) async fn handle_edit_webhook<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
696where
697    S: CnftStore + ?Sized,
698    C: CacheStore + ?Sized,
699    U: UpstreamClient + ?Sized + 'static,
700{
701    let id = req
702        .params
703        .get("webhookID")
704        .or_else(|| req.params.get("webhook_id"))
705        .or_else(|| req.params.get("id"))
706        .and_then(Value::as_str);
707    let Some(id) = id else {
708        return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
709    };
710    let input = parse_webhook_input(&req.params);
711    match ctx.webhooks.edit(id, input).await {
712        Ok(wh) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
713        Err(e) => webhook_error_to_response(&req.id, &e),
714    }
715}
716
717pub(crate) async fn handle_delete_webhook<S, C, U>(
718    ctx: &Ctx<S, C, U>,
719    req: &JsonRpcRequest,
720) -> Value
721where
722    S: CnftStore + ?Sized,
723    C: CacheStore + ?Sized,
724    U: UpstreamClient + ?Sized + 'static,
725{
726    let id = req
727        .params
728        .get("webhookID")
729        .or_else(|| req.params.get("webhook_id"))
730        .or_else(|| req.params.get("id"))
731        .and_then(Value::as_str);
732    let Some(id) = id else {
733        return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
734    };
735    match ctx.webhooks.delete(id).await {
736        Ok(removed) => ok(&req.id, json!({ "deleted": removed })),
737        Err(e) => webhook_error_to_response(&req.id, &e),
738    }
739}
740
741// ─── Enhanced Transactions ─────────────────────────────────────────
742
743/// `helius.enhanced.getTransactions([signature, ...])`. Fans out one
744/// `getTransaction` per signature and classifies each.
745pub(crate) async fn handle_get_transactions<S, C, U>(
746    ctx: &Ctx<S, C, U>,
747    req: &JsonRpcRequest,
748) -> Value
749where
750    S: CnftStore + ?Sized,
751    C: CacheStore + ?Sized,
752    U: UpstreamClient + ?Sized + 'static,
753{
754    let sigs: Vec<String> = req
755        .params
756        .get("signatures")
757        .or_else(|| {
758            // positional fallback: first param may be the array.
759            req.params.as_array().and_then(|a| a.first())
760        })
761        .and_then(Value::as_array)
762        .map(|arr| {
763            arr.iter()
764                .filter_map(|v| v.as_str().map(String::from))
765                .collect()
766        })
767        .unwrap_or_default();
768    if sigs.is_empty() {
769        return fail(
770            &req.id,
771            codes::INVALID_PARAMS,
772            "getTransactions requires a non-empty `signatures` array",
773        );
774    }
775    let mut out = get_transactions(&*ctx.upstream, &sigs).await;
776    // Opportunistic enrichment: if a transfer's mint is already in
777    // the DAS cache, we know its tokenStandard without another
778    // upstream hop. Misses stay None (skip-on-serialize).
779    enrich_token_standards(&*ctx.cache, &mut out).await;
780    ok(&req.id, serde_json::to_value(out).unwrap_or(Value::Null))
781}
782
783/// `helius.enhanced.getTransactionsByAddress(address, options)`.
784/// Resolves signatures via `getSignaturesForAddress` then fans out.
785pub(crate) async fn handle_get_transactions_by_address<S, C, U>(
786    ctx: &Ctx<S, C, U>,
787    req: &JsonRpcRequest,
788) -> Value
789where
790    S: CnftStore + ?Sized,
791    C: CacheStore + ?Sized,
792    U: UpstreamClient + ?Sized + 'static,
793{
794    let Some(address) = req
795        .params
796        .get("address")
797        .and_then(Value::as_str)
798        .map(String::from)
799    else {
800        return fail(
801            &req.id,
802            codes::INVALID_PARAMS,
803            "getTransactionsByAddress requires `address`",
804        );
805    };
806    let options = TransactionsByAddressOptions {
807        before: req
808            .params
809            .get("before")
810            .and_then(Value::as_str)
811            .map(String::from),
812        until: req
813            .params
814            .get("until")
815            .and_then(Value::as_str)
816            .map(String::from),
817        limit: req.params.get("limit").and_then(Value::as_u64),
818        types: req
819            .params
820            .get("type")
821            .and_then(Value::as_str)
822            .map(|s| vec![s.to_string()])
823            .or_else(|| {
824                req.params
825                    .get("types")
826                    .and_then(Value::as_array)
827                    .map(|arr| {
828                        arr.iter()
829                            .filter_map(|v| v.as_str().map(String::from))
830                            .collect()
831                    })
832            })
833            .unwrap_or_default(),
834    };
835    let mut out = get_transactions_by_address(&*ctx.upstream, &address, &options).await;
836    enrich_token_standards(&*ctx.cache, &mut out).await;
837    ok(&req.id, serde_json::to_value(out).unwrap_or(Value::Null))
838}
839
840/// `getTransfersByAddress(address, options)` — JSON-RPC. Returns a
841/// flat list of parsed SOL + SPL transfer events per signature,
842/// filtered by mint/direction. BEST_EFFORT: only sees what Surfpool
843/// has streamed.
844pub(crate) async fn handle_get_transfers_by_address<S, C, U>(
845    ctx: &Ctx<S, C, U>,
846    req: &JsonRpcRequest,
847) -> Value
848where
849    S: CnftStore + ?Sized,
850    C: CacheStore + ?Sized,
851    U: UpstreamClient + ?Sized + 'static,
852{
853    // Real Helius accepts positional `[address, options]` over JSON-RPC.
854    // Our dispatcher normalizes that into req.params {address, ...opts}
855    // via the json_rpc layer; both shapes resolve here the same way.
856    let Some(address) = req
857        .params
858        .get("address")
859        .and_then(Value::as_str)
860        .map(String::from)
861    else {
862        return fail(
863            &req.id,
864            codes::INVALID_PARAMS,
865            "getTransfersByAddress requires `address`",
866        );
867    };
868    let opts = TransfersByAddressOptions {
869        mint: req
870            .params
871            .get("mint")
872            .and_then(Value::as_str)
873            .map(String::from),
874        direction: req
875            .params
876            .get("direction")
877            .and_then(Value::as_str)
878            .and_then(Direction::parse),
879        limit: req.params.get("limit").and_then(Value::as_u64),
880        sort: req
881            .params
882            .get("sort")
883            .and_then(Value::as_str)
884            .map(Sort::parse)
885            .unwrap_or_default(),
886        pagination_token: req
887            .params
888            .get("paginationToken")
889            .and_then(Value::as_str)
890            .map(String::from),
891    };
892    let result = get_transfers_by_address(&*ctx.upstream, &address, &opts).await;
893    ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null))
894}
895
896/// `getTransactionsForAddress(address, options)` — JSON-RPC. Combined
897/// sig fetch + tx fetch + classify. Returns enhanced transactions
898/// (full bodies, not just signatures). BEST_EFFORT: history limited
899/// to what the upstream has.
900pub(crate) async fn handle_get_transactions_for_address<S, C, U>(
901    ctx: &Ctx<S, C, U>,
902    req: &JsonRpcRequest,
903) -> Value
904where
905    S: CnftStore + ?Sized,
906    C: CacheStore + ?Sized,
907    U: UpstreamClient + ?Sized + 'static,
908{
909    let Some(address) = req
910        .params
911        .get("address")
912        .and_then(Value::as_str)
913        .map(String::from)
914    else {
915        return fail(
916            &req.id,
917            codes::INVALID_PARAMS,
918            "getTransactionsForAddress requires `address`",
919        );
920    };
921    let opts = TransactionsForAddressOptions {
922        limit: req.params.get("limit").and_then(Value::as_u64),
923        pagination_token: req
924            .params
925            .get("paginationToken")
926            .and_then(Value::as_str)
927            .map(String::from),
928        min_slot: req.params.get("minSlot").and_then(Value::as_u64),
929        max_slot: req.params.get("maxSlot").and_then(Value::as_u64),
930        status: req
931            .params
932            .get("status")
933            .and_then(Value::as_str)
934            .and_then(TxStatus::parse),
935    };
936    let mut result = get_transactions_for_address(&*ctx.upstream, &address, &opts).await;
937    enrich_token_standards(&*ctx.cache, &mut result.data).await;
938    ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null))
939}
940
941/// `getProgramAccountsV2` — cursor-paginated passthrough over
942/// `getProgramAccounts`. Forwards user-supplied filters / dataSlice /
943/// encoding verbatim to the upstream, sorts by pubkey for stable
944/// pagination, then slices by `cursor` + `limit`. Returns the next
945/// cursor only when there's more data.
946async fn handle_get_program_accounts_v2<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
947where
948    S: CnftStore + ?Sized,
949    C: CacheStore + ?Sized,
950    U: UpstreamClient + ?Sized + 'static,
951{
952    let Some(program_id) = req
953        .params
954        .get("programId")
955        .and_then(Value::as_str)
956        .map(String::from)
957    else {
958        return fail(
959            &req.id,
960            codes::INVALID_PARAMS,
961            "getProgramAccountsV2 requires `programId`",
962        );
963    };
964
965    let mut cfg = serde_json::Map::new();
966    // Forward standard-RPC config fields verbatim when provided.
967    for key in [
968        "encoding",
969        "commitment",
970        "filters",
971        "dataSlice",
972        "minContextSlot",
973    ] {
974        if let Some(v) = req.params.get(key) {
975            cfg.insert(key.to_string(), v.clone());
976        }
977    }
978    let params = json!([program_id, Value::Object(cfg)]);
979
980    let raw = match ctx.upstream.rpc_call("getProgramAccounts", params).await {
981        Ok(r) => r,
982        Err(e) => {
983            return fail(
984                &req.id,
985                codes::INTERNAL_ERROR,
986                format!("upstream getProgramAccounts failed: {e}"),
987            );
988        }
989    };
990
991    let cursor = req
992        .params
993        .get("cursor")
994        .and_then(Value::as_str)
995        .map(String::from);
996    let limit = req
997        .params
998        .get("limit")
999        .and_then(Value::as_u64)
1000        .unwrap_or(1000);
1001
1002    let response = build_cursor_page(&raw, cursor.as_deref(), limit);
1003    ok(&req.id, response)
1004}
1005
1006/// `getTokenAccountsByOwnerV2` — cursor-paginated passthrough over
1007/// `getTokenAccountsByOwner`. Same cursor semantics as V2 above.
1008async fn handle_get_token_accounts_by_owner_v2<S, C, U>(
1009    ctx: &Ctx<S, C, U>,
1010    req: &JsonRpcRequest,
1011) -> Value
1012where
1013    S: CnftStore + ?Sized,
1014    C: CacheStore + ?Sized,
1015    U: UpstreamClient + ?Sized + 'static,
1016{
1017    let Some(owner) = req
1018        .params
1019        .get("owner")
1020        .and_then(Value::as_str)
1021        .map(String::from)
1022    else {
1023        return fail(
1024            &req.id,
1025            codes::INVALID_PARAMS,
1026            "getTokenAccountsByOwnerV2 requires `owner`",
1027        );
1028    };
1029
1030    // Underlying RPC wants one of { mint } or { programId } as its
1031    // filter object. Prefer mint when both are given (more specific).
1032    let filter_obj = if let Some(mint) = req.params.get("mint").and_then(Value::as_str) {
1033        json!({ "mint": mint })
1034    } else if let Some(program_id) = req.params.get("programId").and_then(Value::as_str) {
1035        json!({ "programId": program_id })
1036    } else {
1037        return fail(
1038            &req.id,
1039            codes::INVALID_PARAMS,
1040            "getTokenAccountsByOwnerV2 requires `mint` or `programId`",
1041        );
1042    };
1043
1044    let mut cfg = serde_json::Map::new();
1045    for key in ["encoding", "commitment", "minContextSlot"] {
1046        if let Some(v) = req.params.get(key) {
1047            cfg.insert(key.to_string(), v.clone());
1048        }
1049    }
1050    let params = json!([owner, filter_obj, Value::Object(cfg)]);
1051
1052    let raw = match ctx
1053        .upstream
1054        .rpc_call("getTokenAccountsByOwner", params)
1055        .await
1056    {
1057        Ok(r) => r,
1058        Err(e) => {
1059            return fail(
1060                &req.id,
1061                codes::INTERNAL_ERROR,
1062                format!("upstream getTokenAccountsByOwner failed: {e}"),
1063            );
1064        }
1065    };
1066
1067    let cursor = req
1068        .params
1069        .get("cursor")
1070        .and_then(Value::as_str)
1071        .map(String::from);
1072    let limit = req
1073        .params
1074        .get("limit")
1075        .and_then(Value::as_u64)
1076        .unwrap_or(1000);
1077
1078    let response = build_cursor_page(&raw, cursor.as_deref(), limit);
1079    ok(&req.id, response)
1080}
1081
1082/// Slice the upstream's `[{pubkey, account}, ...]` (or
1083/// `{context, value: [...]}`) payload into a cursor-paginated page.
1084/// Cursor = last pubkey of the previous page; items are ordered
1085/// lexicographically by pubkey so a cursor-based walk is deterministic.
1086fn build_cursor_page(raw: &[u8], cursor: Option<&str>, limit: u64) -> Value {
1087    let parsed: Value = serde_json::from_slice(raw).unwrap_or(Value::Null);
1088    let array = if let Some(inner) = parsed.get("value") {
1089        inner.as_array().cloned().unwrap_or_default()
1090    } else {
1091        parsed.as_array().cloned().unwrap_or_default()
1092    };
1093
1094    let mut sorted = array;
1095    sorted.sort_by(|a, b| {
1096        let ak = a.get("pubkey").and_then(Value::as_str).unwrap_or("");
1097        let bk = b.get("pubkey").and_then(Value::as_str).unwrap_or("");
1098        ak.cmp(bk)
1099    });
1100
1101    // Apply cursor: drop everything at or before the given pubkey.
1102    let mut filtered: Vec<Value> = if let Some(c) = cursor {
1103        sorted
1104            .into_iter()
1105            .filter(|entry| {
1106                entry
1107                    .get("pubkey")
1108                    .and_then(Value::as_str)
1109                    .is_some_and(|pk| pk > c)
1110            })
1111            .collect()
1112    } else {
1113        sorted
1114    };
1115
1116    // Limit.
1117    let limit_usize = usize::try_from(limit.max(1)).unwrap_or(1000);
1118    let has_more = filtered.len() > limit_usize;
1119    filtered.truncate(limit_usize);
1120    let next_cursor = if has_more {
1121        filtered
1122            .last()
1123            .and_then(|e| e.get("pubkey"))
1124            .and_then(Value::as_str)
1125            .map(String::from)
1126    } else {
1127        None
1128    };
1129
1130    match next_cursor {
1131        Some(c) => json!({ "items": filtered, "cursor": c }),
1132        None => json!({ "items": filtered }),
1133    }
1134}
1135
1136/// `helius.tx.sendTransactionWithSender`.
1137///
1138/// Real Helius routes the tx through its parallel Jito-relay fleet
1139/// for faster landing. Locally we can't reproduce the fleet, so we
1140/// shim by forwarding the tx to the upstream's plain
1141/// `sendTransaction`. Callers get a signature back; inclusion latency
1142/// is whatever the local validator produces.
1143///
1144/// Params mirror `sendTransaction` and forward untouched. Helius-
1145/// specific knobs like `skipPreflight` or Jito-tip addresses flow
1146/// through to Surfpool, which either supports or silently ignores them.
1147async fn handle_send_transaction_with_sender<S, C, U>(
1148    ctx: &Ctx<S, C, U>,
1149    req: &JsonRpcRequest,
1150) -> Value
1151where
1152    S: CnftStore + ?Sized,
1153    C: CacheStore + ?Sized,
1154    U: UpstreamClient + ?Sized + 'static,
1155{
1156    match ctx
1157        .upstream
1158        .rpc_call("sendTransaction", req.params.clone())
1159        .await
1160    {
1161        Ok(raw) => {
1162            let result: Value = serde_json::from_slice(&raw).unwrap_or(Value::Null);
1163            ok(&req.id, result)
1164        }
1165        Err(e) => fail(
1166            &req.id,
1167            codes::INTERNAL_ERROR,
1168            format!("upstream sendTransaction failed: {e}"),
1169        ),
1170    }
1171}
1172
1173/// `helius.tx.getPriorityFeeEstimate` — computes percentiles locally
1174/// over `getRecentPrioritizationFees` samples. On Surfpool (local, no
1175/// contention) the upstream returns an empty array and every level is
1176/// 0, which is the correct answer for a no-contention environment.
1177///
1178/// Supports both response shapes:
1179/// - `includeAllPriorityFeeLevels: true` → `{ priorityFeeLevels: {...} }`
1180/// - otherwise → `{ priorityFeeEstimate: <single number> }` using the
1181///   requested `priorityLevel` (defaults to `medium`).
1182async fn handle_get_priority_fee_estimate<S, C, U>(
1183    ctx: &Ctx<S, C, U>,
1184    req: &JsonRpcRequest,
1185) -> Value
1186where
1187    S: CnftStore + ?Sized,
1188    C: CacheStore + ?Sized,
1189    U: UpstreamClient + ?Sized + 'static,
1190{
1191    // Helius accepts params as [{ accountKeys, options }]; also tolerant
1192    // of the bare object form { accountKeys, options }.
1193    let params_obj = match &req.params {
1194        Value::Array(a) => a.first().cloned().unwrap_or(Value::Null),
1195        other => other.clone(),
1196    };
1197    let account_keys: Vec<String> = params_obj
1198        .get("accountKeys")
1199        .and_then(Value::as_array)
1200        .map(|arr| {
1201            arr.iter()
1202                .filter_map(|v| v.as_str().map(String::from))
1203                .collect()
1204        })
1205        .unwrap_or_default();
1206    let options = params_obj.get("options").cloned().unwrap_or(Value::Null);
1207    let include_all = options
1208        .get("includeAllPriorityFeeLevels")
1209        .and_then(Value::as_bool)
1210        .unwrap_or(false);
1211
1212    // Fetch recent prioritization fees via the upstream. Surfpool (no
1213    // contention) returns []; real devnet/mainnet returns up to 150
1214    // samples.
1215    let upstream_params = if account_keys.is_empty() {
1216        json!([])
1217    } else {
1218        json!([account_keys])
1219    };
1220    let raw = match ctx
1221        .upstream
1222        .rpc_call("getRecentPrioritizationFees", upstream_params)
1223        .await
1224    {
1225        Ok(r) => r,
1226        Err(e) => {
1227            return fail(
1228                &req.id,
1229                codes::INTERNAL_ERROR,
1230                format!("upstream getRecentPrioritizationFees failed: {e}"),
1231            );
1232        }
1233    };
1234    // Result shape: [{ slot: u64, prioritizationFee: u64 }, ...]
1235    let fees: Vec<u64> = serde_json::from_slice::<Value>(&raw)
1236        .ok()
1237        .and_then(|v| v.as_array().cloned())
1238        .map(|arr| {
1239            arr.iter()
1240                .filter_map(|entry| entry.get("prioritizationFee").and_then(Value::as_u64))
1241                .collect()
1242        })
1243        .unwrap_or_default();
1244
1245    let levels = compute_levels(&fees);
1246
1247    if include_all {
1248        ok(
1249            &req.id,
1250            json!({
1251                "priorityFeeLevels": levels,
1252            }),
1253        )
1254    } else {
1255        // Single-level response path. Parse priorityLevel ("medium" by
1256        // default) and resolve against the sorted distribution.
1257        let level: PriorityLevel = options
1258            .get("priorityLevel")
1259            .and_then(Value::as_str)
1260            .and_then(|s| match s {
1261                "min" | "Min" => Some(PriorityLevel::Min),
1262                "low" | "Low" => Some(PriorityLevel::Low),
1263                "medium" | "Medium" => Some(PriorityLevel::Medium),
1264                "high" | "High" => Some(PriorityLevel::High),
1265                "veryHigh" | "VeryHigh" => Some(PriorityLevel::VeryHigh),
1266                "unsafeMax" | "UnsafeMax" => Some(PriorityLevel::UnsafeMax),
1267                _ => None,
1268            })
1269            .unwrap_or(PriorityLevel::Medium);
1270        let mut sorted = fees;
1271        sorted.sort_unstable();
1272        let estimate = percentile_at(&sorted, level);
1273        ok(
1274            &req.id,
1275            json!({
1276                "priorityFeeEstimate": estimate,
1277            }),
1278        )
1279    }
1280}
1281
1282// `async` for symmetry with the other handler signatures — `dispatch`
1283// awaits every variant uniformly. Pure fn-equivalent handlers don't
1284// need to await anything.
1285#[allow(clippy::unused_async)]
1286async fn handle_tidepool_info<S, C, U>(_ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
1287where
1288    S: CnftStore + ?Sized,
1289    C: CacheStore + ?Sized,
1290    U: UpstreamClient + ?Sized + 'static,
1291{
1292    let methods = manifest();
1293    let summary = summarize(methods);
1294    ok(
1295        &req.id,
1296        json!({
1297            "name": "tidepool",
1298            "version": env!("CARGO_PKG_VERSION"),
1299            "methods": methods,
1300            "summary": summary,
1301            // Upstream pins this release was tested against. Parsed
1302            // at compile time from `compatibility.toml` — see
1303            // `crates/service/src/compatibility.rs`.
1304            "compatibility": compatibility(),
1305        }),
1306    )
1307}
1308
1309async fn handle_tidepool_index_tree<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
1310where
1311    S: CnftStore + ?Sized,
1312    C: CacheStore + ?Sized,
1313    U: UpstreamClient + ?Sized + 'static,
1314{
1315    let Some(tree_b58) = req.params.get("tree").and_then(Value::as_str) else {
1316        return fail(&req.id, codes::INVALID_PARAMS, "missing `tree` param");
1317    };
1318    let Some(tree_bytes) = bs58_to_32(tree_b58) else {
1319        return fail(
1320            &req.id,
1321            codes::INVALID_PARAMS,
1322            "`tree` is not a valid 32-byte base58 address",
1323        );
1324    };
1325    let opts = IndexTreeOptions::default();
1326    match index_tree(&*ctx.upstream, &*ctx.cnft, tree_bytes, &opts).await {
1327        Ok(result) => ok(
1328            &req.id,
1329            json!({
1330                "tree": tree_b58,
1331                "processed": result.processed,
1332                "applied": result.applied,
1333                "skipped": result.skipped,
1334            }),
1335        ),
1336        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("Index failed: {e}")),
1337    }
1338}
1339
1340/// `tidepool_exportTreeSnapshot` — export one tree's indexed state as
1341/// a wire envelope the caller can save + later feed back via
1342/// `tidepool_loadTreeSnapshot` or the CLI's `--snapshot` flag.
1343/// Returns `null` when the tree isn't registered.
1344///
1345/// Shape mirrors Surfpool's `surfnet_exportSnapshot` but scoped to
1346/// cNFT tree state (our data model, not SVM accounts).
1347async fn handle_tidepool_export_tree_snapshot<S, C, U>(
1348    ctx: &Ctx<S, C, U>,
1349    req: &JsonRpcRequest,
1350) -> Value
1351where
1352    S: CnftStore + ?Sized,
1353    C: CacheStore + ?Sized,
1354    U: UpstreamClient + ?Sized + 'static,
1355{
1356    let Some(tree_b58) = req.params.get("tree").and_then(Value::as_str) else {
1357        return fail(&req.id, codes::INVALID_PARAMS, "missing `tree` param");
1358    };
1359    let Some(tree_bytes) = bs58_to_32(tree_b58) else {
1360        return fail(
1361            &req.id,
1362            codes::INVALID_PARAMS,
1363            "`tree` is not a valid 32-byte base58 address",
1364        );
1365    };
1366    match tidepool_rpc::cnft::dump_tree(&*ctx.cnft, &tree_bytes).await {
1367        Ok(Some(snapshot)) => {
1368            let blob = tidepool_rpc::cnft::SnapshotBlob::from_tree(&snapshot);
1369            ok(
1370                &req.id,
1371                json!({
1372                    "tree": tree_b58,
1373                    "leafCount": snapshot.leaves.len(),
1374                    "lastSignature": snapshot.last_signature,
1375                    "snapshot": blob,
1376                }),
1377            )
1378        }
1379        Ok(None) => ok(&req.id, Value::Null),
1380        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("dump failed: {e}")),
1381    }
1382}
1383
1384/// `tidepool_loadTreeSnapshot` — apply a previously-exported snapshot
1385/// to the local store. Overwrites any existing state for the tree.
1386async fn handle_tidepool_load_tree_snapshot<S, C, U>(
1387    ctx: &Ctx<S, C, U>,
1388    req: &JsonRpcRequest,
1389) -> Value
1390where
1391    S: CnftStore + ?Sized,
1392    C: CacheStore + ?Sized,
1393    U: UpstreamClient + ?Sized + 'static,
1394{
1395    let Some(snapshot_v) = req.params.get("snapshot") else {
1396        return fail(&req.id, codes::INVALID_PARAMS, "missing `snapshot` param");
1397    };
1398    let blob: tidepool_rpc::cnft::SnapshotBlob = match serde_json::from_value(snapshot_v.clone()) {
1399        Ok(b) => b,
1400        Err(e) => {
1401            return fail(
1402                &req.id,
1403                codes::INVALID_PARAMS,
1404                format!("snapshot envelope: {e}"),
1405            )
1406        }
1407    };
1408    let snapshot = match blob.into_tree_snapshot() {
1409        Ok(s) => s,
1410        Err(e) => return fail(&req.id, codes::INVALID_PARAMS, e),
1411    };
1412    match tidepool_rpc::cnft::load_tree(&*ctx.cnft, snapshot).await {
1413        Ok(summary) => ok(
1414            &req.id,
1415            json!({
1416                "tree": bs58::encode(summary.tree).into_string(),
1417                "leafCount": summary.leaf_count,
1418            }),
1419        ),
1420        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("load failed: {e}")),
1421    }
1422}
1423
1424// ─── param helpers ────────────────────────────────────────────────
1425
1426fn extract_id_param(params: &Value) -> Option<String> {
1427    // Accept both `{ "id": "..." }` and `[...]` positional forms.
1428    if let Some(id) = params.get("id").and_then(Value::as_str) {
1429        return Some(id.to_string());
1430    }
1431    if let Some(arr) = params.as_array() {
1432        if let Some(id) = arr.first().and_then(Value::as_str) {
1433            return Some(id.to_string());
1434        }
1435    }
1436    None
1437}
1438
1439fn bs58_to_32(s: &str) -> Option<[u8; 32]> {
1440    let v = bs58::decode(s).into_vec().ok()?;
1441    v.try_into().ok()
1442}