Skip to main content

tidepool_server/
dispatcher.rs

1//! JSON-RPC method dispatch. The core architectural win: every
2//! method we serve maps to a [`Method`] enum variant, and the dispatch
3//! function does an **exhaustive** match. Adding a new method = add
4//! a variant + a match arm; compiler fails the build if you forget.
5//!
6//! Methods we don't recognize fall through to `Method::Passthrough`
7//! which forwards the raw JSON-RPC envelope to the upstream.
8
9use std::sync::Arc;
10
11use serde_json::{json, Value};
12use tracing::warn;
13
14use tidepool_rpc::cache::{CacheStore, SearchFilter};
15use tidepool_rpc::cnft::{index_tree, CnftStore, IndexTreeOptions};
16use tidepool_rpc::compat::{manifest, summarize};
17use tidepool_rpc::compatibility::compatibility;
18use tidepool_rpc::das::{
19    get_asset_full, get_asset_proof, get_asset_proof_batch, get_assets_by_authority,
20    get_assets_by_creator, get_assets_by_group, get_assets_by_owner, get_balances,
21    get_nft_editions, get_token_accounts, search_assets, AccountDecoder, TokenAccountsFilter,
22};
23use tidepool_rpc::enhanced::{
24    enrich_token_standards, get_transactions, get_transactions_by_address,
25    TransactionsByAddressOptions,
26};
27use tidepool_rpc::priority_fee::{compute_levels, percentile_at, PriorityLevel};
28use tidepool_rpc::upstream::UpstreamClient;
29use tidepool_rpc::webhooks::{PostClient, WebhookError, WebhookInput};
30
31use crate::json_rpc::{codes, fail, ok, JsonRpcRequest};
32use crate::webhook_runtime::WebhookRuntime;
33
34/// Every method the server handles natively. Anything not listed here
35/// is forwarded to the upstream unchanged.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum Method {
38    // DAS
39    GetAsset,
40    GetAssetBatch,
41    GetAssetProof,
42    GetAssetProofBatch,
43    GetAssetsByOwner,
44    GetAssetsByAuthority,
45    GetAssetsByCreator,
46    GetAssetsByGroup,
47    SearchAssets,
48    GetNftEditions,
49    GetTokenAccounts,
50    // Helius v2 (cursor-paginated)
51    GetProgramAccountsV2,
52    GetTokenAccountsByOwnerV2,
53    // NOTE: getBalances, createWebhook/getAllWebhooks/... ,
54    // getTransactions, and getTransactionsByAddress are deliberately
55    // absent from this enum. Helius serves them via REST
56    // (`api.helius.xyz/v0/...`), not JSON-RPC. Serving them here
57    // would let users write local code that'd fail against real
58    // Helius. They live in `crate::rest` instead, routed to the
59    // same handler functions (`pub(crate) handle_*`).
60    // Tx (Helius-custom)
61    GetPriorityFeeEstimate,
62    SendTransactionWithSender,
63    // Tidepool custom
64    TidepoolInfo,
65    TidepoolIndexTree,
66    TidepoolExportTreeSnapshot,
67    TidepoolLoadTreeSnapshot,
68}
69
70impl Method {
71    /// Try to parse a wire method name. Returns `None` for
72    /// methods the server doesn't own — caller falls through to the
73    /// passthrough path.
74    #[must_use]
75    pub fn from_wire(name: &str) -> Option<Self> {
76        Some(match name {
77            "getAsset" => Self::GetAsset,
78            "getAssetBatch" => Self::GetAssetBatch,
79            "getAssetProof" => Self::GetAssetProof,
80            "getAssetProofBatch" => Self::GetAssetProofBatch,
81            "getAssetsByOwner" => Self::GetAssetsByOwner,
82            "getAssetsByAuthority" => Self::GetAssetsByAuthority,
83            "getAssetsByCreator" => Self::GetAssetsByCreator,
84            "getAssetsByGroup" => Self::GetAssetsByGroup,
85            "searchAssets" => Self::SearchAssets,
86            "getNftEditions" => Self::GetNftEditions,
87            "getTokenAccounts" => Self::GetTokenAccounts,
88            "getProgramAccountsV2" => Self::GetProgramAccountsV2,
89            "getTokenAccountsByOwnerV2" => Self::GetTokenAccountsByOwnerV2,
90            "getPriorityFeeEstimate" => Self::GetPriorityFeeEstimate,
91            "sendTransactionWithSender" => Self::SendTransactionWithSender,
92            "tidepool_info" => Self::TidepoolInfo,
93            "tidepool_indexTree" => Self::TidepoolIndexTree,
94            "tidepool_exportTreeSnapshot" => Self::TidepoolExportTreeSnapshot,
95            "tidepool_loadTreeSnapshot" => Self::TidepoolLoadTreeSnapshot,
96            _ => return None,
97        })
98    }
99
100    /// Reverse — useful for the compat manifest.
101    #[must_use]
102    pub fn to_wire(self) -> &'static str {
103        match self {
104            Self::GetAsset => "getAsset",
105            Self::GetAssetBatch => "getAssetBatch",
106            Self::GetAssetProof => "getAssetProof",
107            Self::GetAssetProofBatch => "getAssetProofBatch",
108            Self::GetAssetsByOwner => "getAssetsByOwner",
109            Self::GetAssetsByAuthority => "getAssetsByAuthority",
110            Self::GetAssetsByCreator => "getAssetsByCreator",
111            Self::GetAssetsByGroup => "getAssetsByGroup",
112            Self::SearchAssets => "searchAssets",
113            Self::GetNftEditions => "getNftEditions",
114            Self::GetTokenAccounts => "getTokenAccounts",
115            Self::GetProgramAccountsV2 => "getProgramAccountsV2",
116            Self::GetTokenAccountsByOwnerV2 => "getTokenAccountsByOwnerV2",
117            Self::GetPriorityFeeEstimate => "getPriorityFeeEstimate",
118            Self::SendTransactionWithSender => "sendTransactionWithSender",
119            Self::TidepoolInfo => "tidepool_info",
120            Self::TidepoolIndexTree => "tidepool_indexTree",
121            Self::TidepoolExportTreeSnapshot => "tidepool_exportTreeSnapshot",
122            Self::TidepoolLoadTreeSnapshot => "tidepool_loadTreeSnapshot",
123        }
124    }
125}
126
127/// Shared request-handling context. Wired once at server start and
128/// passed to every dispatch call.
129pub struct Ctx<S, C, U>
130where
131    S: CnftStore + ?Sized,
132    C: CacheStore + ?Sized,
133    U: UpstreamClient + ?Sized + 'static,
134{
135    pub cnft: Arc<S>,
136    pub cache: Arc<C>,
137    pub upstream: Arc<U>,
138    pub decoders: Arc<[Arc<dyn AccountDecoder>]>,
139    pub webhooks: Arc<WebhookRuntime<U, dyn PostClient>>,
140}
141
142impl<S, C, U> Clone for Ctx<S, C, U>
143where
144    S: CnftStore + ?Sized,
145    C: CacheStore + ?Sized,
146    U: UpstreamClient + ?Sized + 'static,
147{
148    fn clone(&self) -> Self {
149        Self {
150            cnft: Arc::clone(&self.cnft),
151            cache: Arc::clone(&self.cache),
152            upstream: Arc::clone(&self.upstream),
153            decoders: Arc::clone(&self.decoders),
154            webhooks: Arc::clone(&self.webhooks),
155        }
156    }
157}
158
159/// Dispatch one JSON-RPC request. Returns `Some(response)` when we
160/// handled it natively, `None` when the caller should passthrough.
161pub async fn dispatch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Option<Value>
162where
163    S: CnftStore + ?Sized,
164    C: CacheStore + ?Sized,
165    U: UpstreamClient + ?Sized + 'static,
166{
167    let method = Method::from_wire(&req.method)?;
168    Some(match method {
169        Method::GetAsset => handle_get_asset(ctx, req).await,
170        Method::GetAssetBatch => handle_get_asset_batch(ctx, req).await,
171        Method::GetAssetProof => handle_get_asset_proof(ctx, req).await,
172        Method::GetAssetProofBatch => handle_get_asset_proof_batch(ctx, req).await,
173        Method::GetAssetsByOwner => handle_get_assets_by_owner(ctx, req).await,
174        Method::GetAssetsByAuthority => handle_get_assets_by_authority(ctx, req).await,
175        Method::GetAssetsByCreator => handle_get_assets_by_creator(ctx, req).await,
176        Method::GetAssetsByGroup => handle_get_assets_by_group(ctx, req).await,
177        Method::SearchAssets => handle_search_assets(ctx, req).await,
178        Method::GetNftEditions => handle_get_nft_editions(ctx, req).await,
179        Method::GetTokenAccounts => handle_get_token_accounts(ctx, req).await,
180        // NOTE: getBalances, webhook CRUD, and Enhanced Transactions
181        // are reachable only via the REST router (`crate::rest`) —
182        // Helius serves them on `api.helius.xyz/v0/...`, not JSON-RPC.
183        // The handler functions still live in this file but aren't
184        // wired to any `Method` variant here.
185        Method::GetProgramAccountsV2 => handle_get_program_accounts_v2(ctx, req).await,
186        Method::GetTokenAccountsByOwnerV2 => handle_get_token_accounts_by_owner_v2(ctx, req).await,
187        Method::GetPriorityFeeEstimate => handle_get_priority_fee_estimate(ctx, req).await,
188        Method::SendTransactionWithSender => handle_send_transaction_with_sender(ctx, req).await,
189        Method::TidepoolInfo => handle_tidepool_info(ctx, req).await,
190        Method::TidepoolIndexTree => handle_tidepool_index_tree(ctx, req).await,
191        Method::TidepoolExportTreeSnapshot => handle_tidepool_export_tree_snapshot(ctx, req).await,
192        Method::TidepoolLoadTreeSnapshot => handle_tidepool_load_tree_snapshot(ctx, req).await,
193    })
194}
195
196// ─── per-method handlers ──────────────────────────────────────────
197
198async fn handle_get_asset<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
199where
200    S: CnftStore + ?Sized,
201    C: CacheStore + ?Sized,
202    U: UpstreamClient + ?Sized + 'static,
203{
204    let Some(asset_id) = extract_id_param(&req.params) else {
205        return fail(&req.id, codes::INVALID_PARAMS, "missing `id` param");
206    };
207    match get_asset_full(
208        &*ctx.cnft,
209        &*ctx.cache,
210        &*ctx.upstream,
211        &ctx.decoders,
212        &asset_id,
213    )
214    .await
215    {
216        Ok(Some(asset)) => ok(&req.id, serde_json::to_value(asset).unwrap_or(Value::Null)),
217        Ok(None) => fail(&req.id, codes::INTERNAL_ERROR, "Asset not found"),
218        Err(e) => {
219            warn!(method = "getAsset", err = %e, "handler failed");
220            fail(&req.id, codes::INTERNAL_ERROR, format!("{e}"))
221        }
222    }
223}
224
225async fn handle_get_asset_batch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
226where
227    S: CnftStore + ?Sized,
228    C: CacheStore + ?Sized,
229    U: UpstreamClient + ?Sized + 'static,
230{
231    let Some(ids) = req.params.get("ids").and_then(Value::as_array) else {
232        return fail(&req.id, codes::INVALID_PARAMS, "missing `ids` array");
233    };
234    let ids: Vec<String> = ids
235        .iter()
236        .filter_map(|v| v.as_str().map(String::from))
237        .collect();
238    match tidepool_rpc::das::get_asset_batch(
239        &*ctx.cnft,
240        &*ctx.cache,
241        &*ctx.upstream,
242        &ctx.decoders,
243        &ids,
244    )
245    .await
246    {
247        Ok(results) => ok(
248            &req.id,
249            serde_json::to_value(results).unwrap_or(Value::Null),
250        ),
251        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
252    }
253}
254
255async fn handle_get_asset_proof<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
256where
257    S: CnftStore + ?Sized,
258    C: CacheStore + ?Sized,
259    U: UpstreamClient + ?Sized + 'static,
260{
261    let Some(asset_id) = extract_id_param(&req.params) else {
262        return fail(&req.id, codes::INVALID_PARAMS, "missing `id` param");
263    };
264    let Some(id_bytes) = bs58_to_32(&asset_id) else {
265        return fail(
266            &req.id,
267            codes::INVALID_PARAMS,
268            "`id` is not a valid 32-byte base58 address",
269        );
270    };
271    match get_asset_proof(&*ctx.cnft, &id_bytes).await {
272        Ok(Some(p)) => ok(&req.id, serde_json::to_value(p).unwrap_or(Value::Null)),
273        Ok(None) => fail(
274            &req.id,
275            codes::INTERNAL_ERROR,
276            "Asset not found or tree not indexed",
277        ),
278        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
279    }
280}
281
282async fn handle_get_asset_proof_batch<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
283where
284    S: CnftStore + ?Sized,
285    C: CacheStore + ?Sized,
286    U: UpstreamClient + ?Sized + 'static,
287{
288    let Some(ids) = req.params.get("ids").and_then(Value::as_array) else {
289        return fail(&req.id, codes::INVALID_PARAMS, "missing `ids` array");
290    };
291    let id_bytes: Vec<[u8; 32]> = ids
292        .iter()
293        .filter_map(|v| v.as_str())
294        .filter_map(bs58_to_32)
295        .collect();
296    match get_asset_proof_batch(&*ctx.cnft, &id_bytes).await {
297        Ok(results) => {
298            let map: serde_json::Map<String, Value> = ids
299                .iter()
300                .filter_map(|v| v.as_str())
301                .zip(results.into_iter())
302                .map(|(id, proof)| {
303                    (
304                        id.to_string(),
305                        proof.map_or(Value::Null, |p| {
306                            serde_json::to_value(p).unwrap_or(Value::Null)
307                        }),
308                    )
309                })
310                .collect();
311            ok(&req.id, Value::Object(map))
312        }
313        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
314    }
315}
316
317async fn handle_get_assets_by_owner<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
318where
319    S: CnftStore + ?Sized,
320    C: CacheStore + ?Sized,
321    U: UpstreamClient + ?Sized + 'static,
322{
323    let Some(owner) = req.params.get("ownerAddress").and_then(Value::as_str) else {
324        return fail(&req.id, codes::INVALID_PARAMS, "missing `ownerAddress`");
325    };
326    match get_assets_by_owner(&*ctx.cache, owner).await {
327        Ok(items) => ok(&req.id, serde_json::json!({ "items": items })),
328        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
329    }
330}
331
332async fn handle_get_assets_by_authority<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
333where
334    S: CnftStore + ?Sized,
335    C: CacheStore + ?Sized,
336    U: UpstreamClient + ?Sized + 'static,
337{
338    let Some(authority) = req.params.get("authorityAddress").and_then(Value::as_str) else {
339        return fail(&req.id, codes::INVALID_PARAMS, "missing `authorityAddress`");
340    };
341    match get_assets_by_authority(&*ctx.cache, authority).await {
342        Ok(items) => ok(&req.id, json!({ "items": items })),
343        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
344    }
345}
346
347async fn handle_get_assets_by_creator<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
348where
349    S: CnftStore + ?Sized,
350    C: CacheStore + ?Sized,
351    U: UpstreamClient + ?Sized + 'static,
352{
353    let Some(creator) = req.params.get("creatorAddress").and_then(Value::as_str) else {
354        return fail(&req.id, codes::INVALID_PARAMS, "missing `creatorAddress`");
355    };
356    let only_verified = req
357        .params
358        .get("onlyVerified")
359        .and_then(Value::as_bool)
360        .unwrap_or(false);
361    match get_assets_by_creator(&*ctx.cache, creator, only_verified).await {
362        Ok(items) => ok(&req.id, json!({ "items": items })),
363        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
364    }
365}
366
367async fn handle_get_assets_by_group<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
368where
369    S: CnftStore + ?Sized,
370    C: CacheStore + ?Sized,
371    U: UpstreamClient + ?Sized + 'static,
372{
373    let gk = req.params.get("groupKey").and_then(Value::as_str);
374    let gv = req.params.get("groupValue").and_then(Value::as_str);
375    let (Some(gk), Some(gv)) = (gk, gv) else {
376        return fail(
377            &req.id,
378            codes::INVALID_PARAMS,
379            "missing `groupKey` / `groupValue`",
380        );
381    };
382    match get_assets_by_group(&*ctx.cache, gk, gv).await {
383        Ok(items) => ok(&req.id, json!({ "items": items })),
384        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
385    }
386}
387
388async fn handle_search_assets<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
389where
390    S: CnftStore + ?Sized,
391    C: CacheStore + ?Sized,
392    U: UpstreamClient + ?Sized + 'static,
393{
394    let filter = SearchFilter {
395        owner_address: req
396            .params
397            .get("ownerAddress")
398            .and_then(Value::as_str)
399            .map(String::from),
400        authority_address: req
401            .params
402            .get("authorityAddress")
403            .and_then(Value::as_str)
404            .map(String::from),
405        creator_address: req
406            .params
407            .get("creatorAddress")
408            .and_then(Value::as_str)
409            .map(String::from),
410        creator_verified: req.params.get("creatorVerified").and_then(Value::as_bool),
411        grouping: req
412            .params
413            .get("grouping")
414            .and_then(Value::as_array)
415            .and_then(|arr| {
416                let k = arr.first()?.as_str()?.to_string();
417                let v = arr.get(1)?.as_str()?.to_string();
418                Some((k, v))
419            }),
420        interface: req
421            .params
422            .get("interface")
423            .and_then(Value::as_str)
424            .map(String::from),
425        burnt: req.params.get("burnt").and_then(Value::as_bool),
426    };
427    match search_assets(&*ctx.cache, &filter).await {
428        Ok(items) => ok(&req.id, json!({ "items": items })),
429        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
430    }
431}
432
433/// `helius.das.getNftEditions(mint, page, limit)` — serves from the
434/// local edition index populated on `getAsset` fetches. Cold-path
435/// calls do one upstream fetch of the master mint to warm the index;
436/// subsequent calls serve from cache.
437async fn handle_get_nft_editions<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
438where
439    S: CnftStore + ?Sized,
440    C: CacheStore + ?Sized,
441    U: UpstreamClient + ?Sized + 'static,
442{
443    let mint = req
444        .params
445        .get("mint")
446        .or_else(|| req.params.get("id"))
447        .and_then(Value::as_str);
448    let Some(mint) = mint else {
449        return fail(
450            &req.id,
451            codes::INVALID_PARAMS,
452            "getNftEditions requires `mint`",
453        );
454    };
455    let page = req.params.get("page").and_then(Value::as_u64).unwrap_or(1);
456    // Helius's default page size is 100.
457    let limit = req
458        .params
459        .get("limit")
460        .and_then(Value::as_u64)
461        .unwrap_or(100);
462
463    match get_nft_editions(
464        &*ctx.cache,
465        &*ctx.upstream,
466        &ctx.decoders,
467        mint,
468        page,
469        limit,
470    )
471    .await
472    {
473        Ok(Some(result)) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
474        Ok(None) => ok(&req.id, Value::Null),
475        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
476    }
477}
478
479/// `helius.das.getTokenAccounts(owner?, mint?, page?, limit?,
480/// displayOptions.showZeroBalance?)`. Shim — forwards to the upstream
481/// RPC (`getTokenAccountsByOwner` or `getProgramAccounts` memcmp),
482/// reshapes the response, and paginates locally.
483async fn handle_get_token_accounts<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
484where
485    S: CnftStore + ?Sized,
486    C: CacheStore + ?Sized,
487    U: UpstreamClient + ?Sized + 'static,
488{
489    let owner = req
490        .params
491        .get("owner")
492        .and_then(Value::as_str)
493        .map(String::from);
494    let mint = req
495        .params
496        .get("mint")
497        .and_then(Value::as_str)
498        .map(String::from);
499    let page = req.params.get("page").and_then(Value::as_u64).unwrap_or(1);
500    let limit = req
501        .params
502        .get("limit")
503        .and_then(Value::as_u64)
504        .unwrap_or(100);
505    let show_zero_balance = req
506        .params
507        .pointer("/displayOptions/showZeroBalance")
508        .and_then(Value::as_bool)
509        .unwrap_or(false);
510
511    let filter = TokenAccountsFilter {
512        owner,
513        mint,
514        page,
515        limit,
516        show_zero_balance,
517    };
518
519    match get_token_accounts(&*ctx.upstream, &filter).await {
520        Ok(result) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
521        Err(e) => {
522            // BadRequest → invalid-params code; everything else is internal.
523            let code = match &e {
524                tidepool_rpc::das::DasError::BadRequest(_) => codes::INVALID_PARAMS,
525                _ => codes::INTERNAL_ERROR,
526            };
527            fail(&req.id, code, format!("{e}"))
528        }
529    }
530}
531
532/// `helius.wallet.getBalances(owner)` — returns native SOL + all
533/// SPL/Token-2022 positions the wallet holds. Shim — fans out to
534/// `getBalance` + one `getTokenAccountsByOwner` per program.
535pub(crate) async fn handle_get_balances<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
536where
537    S: CnftStore + ?Sized,
538    C: CacheStore + ?Sized,
539    U: UpstreamClient + ?Sized + 'static,
540{
541    // Helius accepts either `owner` or positional `[owner]`.
542    let owner = req.params.get("owner").and_then(Value::as_str).or_else(|| {
543        req.params
544            .as_array()
545            .and_then(|a| a.first())
546            .and_then(Value::as_str)
547    });
548    let Some(owner) = owner else {
549        return fail(
550            &req.id,
551            codes::INVALID_PARAMS,
552            "getBalances requires `owner`",
553        );
554    };
555    match get_balances(&*ctx.upstream, owner).await {
556        Ok(result) => ok(&req.id, serde_json::to_value(result).unwrap_or(Value::Null)),
557        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("{e}")),
558    }
559}
560
561// ─── Webhook CRUD ──────────────────────────────────────────────────
562// All five handlers front the `WebhookRuntime` on the Ctx — creation
563// spawns a per-webhook polling task; deletion aborts it; edit
564// restarts the task with the new config.
565
566fn parse_webhook_input(params: &Value) -> WebhookInput {
567    // Accept both Helius's camelCase (`webhookURL`, `accountAddresses`,
568    // `transactionTypes`, `txnStatus`, `webhookType`, `authHeader`)
569    // wire keys and our snake_case serde defaults.
570    let url = params
571        .get("webhookURL")
572        .or_else(|| params.get("webhook_url"))
573        .and_then(Value::as_str)
574        .map(String::from);
575    let addresses = params
576        .get("accountAddresses")
577        .or_else(|| params.get("account_addresses"))
578        .and_then(Value::as_array)
579        .map(|a| {
580            a.iter()
581                .filter_map(|v| v.as_str().map(String::from))
582                .collect()
583        });
584    let transaction_types = params
585        .get("transactionTypes")
586        .or_else(|| params.get("transaction_types"))
587        .and_then(Value::as_array)
588        .map(|a| {
589            a.iter()
590                .filter_map(|v| v.as_str().map(String::from))
591                .collect()
592        })
593        .unwrap_or_default();
594    let txn_status = params
595        .get("txnStatus")
596        .or_else(|| params.get("txn_status"))
597        .and_then(Value::as_str)
598        .map(String::from);
599    let webhook_type = params
600        .get("webhookType")
601        .or_else(|| params.get("webhook_type"))
602        .and_then(Value::as_str)
603        .map(String::from);
604    let auth_header = params
605        .get("authHeader")
606        .or_else(|| params.get("auth_header"))
607        .and_then(Value::as_str)
608        .map(String::from);
609    WebhookInput {
610        webhook_url: url,
611        account_addresses: addresses,
612        transaction_types,
613        txn_status,
614        webhook_type,
615        auth_header,
616    }
617}
618
619fn webhook_error_to_response(id: &Value, e: &WebhookError) -> Value {
620    let code = match e {
621        WebhookError::BadRequest(_) => codes::INVALID_PARAMS,
622        WebhookError::NotFound { .. } => codes::INTERNAL_ERROR,
623    };
624    fail(id, code, format!("{e}"))
625}
626
627pub(crate) async fn handle_create_webhook<S, C, U>(
628    ctx: &Ctx<S, C, U>,
629    req: &JsonRpcRequest,
630) -> Value
631where
632    S: CnftStore + ?Sized,
633    C: CacheStore + ?Sized,
634    U: UpstreamClient + ?Sized + 'static,
635{
636    let input = parse_webhook_input(&req.params);
637    match ctx.webhooks.create(input).await {
638        Ok(wh) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
639        Err(e) => webhook_error_to_response(&req.id, &e),
640    }
641}
642
643pub(crate) async fn handle_get_all_webhooks<S, C, U>(
644    ctx: &Ctx<S, C, U>,
645    req: &JsonRpcRequest,
646) -> Value
647where
648    S: CnftStore + ?Sized,
649    C: CacheStore + ?Sized,
650    U: UpstreamClient + ?Sized + 'static,
651{
652    match ctx.webhooks.list().await {
653        Ok(items) => ok(&req.id, serde_json::to_value(items).unwrap_or(Value::Null)),
654        Err(e) => webhook_error_to_response(&req.id, &e),
655    }
656}
657
658pub(crate) async fn handle_get_webhook_by_id<S, C, U>(
659    ctx: &Ctx<S, C, U>,
660    req: &JsonRpcRequest,
661) -> Value
662where
663    S: CnftStore + ?Sized,
664    C: CacheStore + ?Sized,
665    U: UpstreamClient + ?Sized + 'static,
666{
667    let id = req
668        .params
669        .get("webhookID")
670        .or_else(|| req.params.get("webhook_id"))
671        .or_else(|| req.params.get("id"))
672        .and_then(Value::as_str);
673    let Some(id) = id else {
674        return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
675    };
676    match ctx.webhooks.get(id).await {
677        Ok(Some(wh)) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
678        Ok(None) => ok(&req.id, Value::Null),
679        Err(e) => webhook_error_to_response(&req.id, &e),
680    }
681}
682
683pub(crate) async fn handle_edit_webhook<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
684where
685    S: CnftStore + ?Sized,
686    C: CacheStore + ?Sized,
687    U: UpstreamClient + ?Sized + 'static,
688{
689    let id = req
690        .params
691        .get("webhookID")
692        .or_else(|| req.params.get("webhook_id"))
693        .or_else(|| req.params.get("id"))
694        .and_then(Value::as_str);
695    let Some(id) = id else {
696        return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
697    };
698    let input = parse_webhook_input(&req.params);
699    match ctx.webhooks.edit(id, input).await {
700        Ok(wh) => ok(&req.id, serde_json::to_value(wh).unwrap_or(Value::Null)),
701        Err(e) => webhook_error_to_response(&req.id, &e),
702    }
703}
704
705pub(crate) async fn handle_delete_webhook<S, C, U>(
706    ctx: &Ctx<S, C, U>,
707    req: &JsonRpcRequest,
708) -> Value
709where
710    S: CnftStore + ?Sized,
711    C: CacheStore + ?Sized,
712    U: UpstreamClient + ?Sized + 'static,
713{
714    let id = req
715        .params
716        .get("webhookID")
717        .or_else(|| req.params.get("webhook_id"))
718        .or_else(|| req.params.get("id"))
719        .and_then(Value::as_str);
720    let Some(id) = id else {
721        return fail(&req.id, codes::INVALID_PARAMS, "missing `webhookID`");
722    };
723    match ctx.webhooks.delete(id).await {
724        Ok(removed) => ok(&req.id, json!({ "deleted": removed })),
725        Err(e) => webhook_error_to_response(&req.id, &e),
726    }
727}
728
729// ─── Enhanced Transactions ─────────────────────────────────────────
730
731/// `helius.enhanced.getTransactions([signature, ...])`. Fans out one
732/// `getTransaction` per signature and classifies each.
733pub(crate) async fn handle_get_transactions<S, C, U>(
734    ctx: &Ctx<S, C, U>,
735    req: &JsonRpcRequest,
736) -> Value
737where
738    S: CnftStore + ?Sized,
739    C: CacheStore + ?Sized,
740    U: UpstreamClient + ?Sized + 'static,
741{
742    let sigs: Vec<String> = req
743        .params
744        .get("signatures")
745        .or_else(|| {
746            // positional fallback: first param may be the array.
747            req.params.as_array().and_then(|a| a.first())
748        })
749        .and_then(Value::as_array)
750        .map(|arr| {
751            arr.iter()
752                .filter_map(|v| v.as_str().map(String::from))
753                .collect()
754        })
755        .unwrap_or_default();
756    if sigs.is_empty() {
757        return fail(
758            &req.id,
759            codes::INVALID_PARAMS,
760            "getTransactions requires a non-empty `signatures` array",
761        );
762    }
763    let mut out = get_transactions(&*ctx.upstream, &sigs).await;
764    // Opportunistic enrichment: if a transfer's mint is already in
765    // the DAS cache, we know its tokenStandard without another
766    // upstream hop. Misses stay None (skip-on-serialize).
767    enrich_token_standards(&*ctx.cache, &mut out).await;
768    ok(&req.id, serde_json::to_value(out).unwrap_or(Value::Null))
769}
770
771/// `helius.enhanced.getTransactionsByAddress(address, options)`.
772/// Resolves signatures via `getSignaturesForAddress` then fans out.
773pub(crate) async fn handle_get_transactions_by_address<S, C, U>(
774    ctx: &Ctx<S, C, U>,
775    req: &JsonRpcRequest,
776) -> Value
777where
778    S: CnftStore + ?Sized,
779    C: CacheStore + ?Sized,
780    U: UpstreamClient + ?Sized + 'static,
781{
782    let Some(address) = req
783        .params
784        .get("address")
785        .and_then(Value::as_str)
786        .map(String::from)
787    else {
788        return fail(
789            &req.id,
790            codes::INVALID_PARAMS,
791            "getTransactionsByAddress requires `address`",
792        );
793    };
794    let options = TransactionsByAddressOptions {
795        before: req
796            .params
797            .get("before")
798            .and_then(Value::as_str)
799            .map(String::from),
800        until: req
801            .params
802            .get("until")
803            .and_then(Value::as_str)
804            .map(String::from),
805        limit: req.params.get("limit").and_then(Value::as_u64),
806        types: req
807            .params
808            .get("type")
809            .and_then(Value::as_str)
810            .map(|s| vec![s.to_string()])
811            .or_else(|| {
812                req.params
813                    .get("types")
814                    .and_then(Value::as_array)
815                    .map(|arr| {
816                        arr.iter()
817                            .filter_map(|v| v.as_str().map(String::from))
818                            .collect()
819                    })
820            })
821            .unwrap_or_default(),
822    };
823    let mut out = get_transactions_by_address(&*ctx.upstream, &address, &options).await;
824    enrich_token_standards(&*ctx.cache, &mut out).await;
825    ok(&req.id, serde_json::to_value(out).unwrap_or(Value::Null))
826}
827
828/// `getProgramAccountsV2` — cursor-paginated passthrough over
829/// `getProgramAccounts`. Forwards user-supplied filters / dataSlice /
830/// encoding verbatim to the upstream, sorts by pubkey for stable
831/// pagination, then slices by `cursor` + `limit`. Returns the next
832/// cursor only when there's more data.
833async fn handle_get_program_accounts_v2<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
834where
835    S: CnftStore + ?Sized,
836    C: CacheStore + ?Sized,
837    U: UpstreamClient + ?Sized + 'static,
838{
839    let Some(program_id) = req
840        .params
841        .get("programId")
842        .and_then(Value::as_str)
843        .map(String::from)
844    else {
845        return fail(
846            &req.id,
847            codes::INVALID_PARAMS,
848            "getProgramAccountsV2 requires `programId`",
849        );
850    };
851
852    let mut cfg = serde_json::Map::new();
853    // Forward standard-RPC config fields verbatim when provided.
854    for key in [
855        "encoding",
856        "commitment",
857        "filters",
858        "dataSlice",
859        "minContextSlot",
860    ] {
861        if let Some(v) = req.params.get(key) {
862            cfg.insert(key.to_string(), v.clone());
863        }
864    }
865    let params = json!([program_id, Value::Object(cfg)]);
866
867    let raw = match ctx.upstream.rpc_call("getProgramAccounts", params).await {
868        Ok(r) => r,
869        Err(e) => {
870            return fail(
871                &req.id,
872                codes::INTERNAL_ERROR,
873                format!("upstream getProgramAccounts failed: {e}"),
874            );
875        }
876    };
877
878    let cursor = req
879        .params
880        .get("cursor")
881        .and_then(Value::as_str)
882        .map(String::from);
883    let limit = req
884        .params
885        .get("limit")
886        .and_then(Value::as_u64)
887        .unwrap_or(1000);
888
889    let response = build_cursor_page(&raw, cursor.as_deref(), limit);
890    ok(&req.id, response)
891}
892
893/// `getTokenAccountsByOwnerV2` — cursor-paginated passthrough over
894/// `getTokenAccountsByOwner`. Same cursor semantics as V2 above.
895async fn handle_get_token_accounts_by_owner_v2<S, C, U>(
896    ctx: &Ctx<S, C, U>,
897    req: &JsonRpcRequest,
898) -> Value
899where
900    S: CnftStore + ?Sized,
901    C: CacheStore + ?Sized,
902    U: UpstreamClient + ?Sized + 'static,
903{
904    let Some(owner) = req
905        .params
906        .get("owner")
907        .and_then(Value::as_str)
908        .map(String::from)
909    else {
910        return fail(
911            &req.id,
912            codes::INVALID_PARAMS,
913            "getTokenAccountsByOwnerV2 requires `owner`",
914        );
915    };
916
917    // Underlying RPC wants one of { mint } or { programId } as its
918    // filter object. Prefer mint when both are given (more specific).
919    let filter_obj = if let Some(mint) = req.params.get("mint").and_then(Value::as_str) {
920        json!({ "mint": mint })
921    } else if let Some(program_id) = req.params.get("programId").and_then(Value::as_str) {
922        json!({ "programId": program_id })
923    } else {
924        return fail(
925            &req.id,
926            codes::INVALID_PARAMS,
927            "getTokenAccountsByOwnerV2 requires `mint` or `programId`",
928        );
929    };
930
931    let mut cfg = serde_json::Map::new();
932    for key in ["encoding", "commitment", "minContextSlot"] {
933        if let Some(v) = req.params.get(key) {
934            cfg.insert(key.to_string(), v.clone());
935        }
936    }
937    let params = json!([owner, filter_obj, Value::Object(cfg)]);
938
939    let raw = match ctx
940        .upstream
941        .rpc_call("getTokenAccountsByOwner", params)
942        .await
943    {
944        Ok(r) => r,
945        Err(e) => {
946            return fail(
947                &req.id,
948                codes::INTERNAL_ERROR,
949                format!("upstream getTokenAccountsByOwner failed: {e}"),
950            );
951        }
952    };
953
954    let cursor = req
955        .params
956        .get("cursor")
957        .and_then(Value::as_str)
958        .map(String::from);
959    let limit = req
960        .params
961        .get("limit")
962        .and_then(Value::as_u64)
963        .unwrap_or(1000);
964
965    let response = build_cursor_page(&raw, cursor.as_deref(), limit);
966    ok(&req.id, response)
967}
968
969/// Slice the upstream's `[{pubkey, account}, ...]` (or
970/// `{context, value: [...]}`) payload into a cursor-paginated page.
971/// Cursor = last pubkey of the previous page; items are ordered
972/// lexicographically by pubkey so a cursor-based walk is deterministic.
973fn build_cursor_page(raw: &[u8], cursor: Option<&str>, limit: u64) -> Value {
974    let parsed: Value = serde_json::from_slice(raw).unwrap_or(Value::Null);
975    let array = if let Some(inner) = parsed.get("value") {
976        inner.as_array().cloned().unwrap_or_default()
977    } else {
978        parsed.as_array().cloned().unwrap_or_default()
979    };
980
981    let mut sorted = array;
982    sorted.sort_by(|a, b| {
983        let ak = a.get("pubkey").and_then(Value::as_str).unwrap_or("");
984        let bk = b.get("pubkey").and_then(Value::as_str).unwrap_or("");
985        ak.cmp(bk)
986    });
987
988    // Apply cursor: drop everything at or before the given pubkey.
989    let mut filtered: Vec<Value> = if let Some(c) = cursor {
990        sorted
991            .into_iter()
992            .filter(|entry| {
993                entry
994                    .get("pubkey")
995                    .and_then(Value::as_str)
996                    .is_some_and(|pk| pk > c)
997            })
998            .collect()
999    } else {
1000        sorted
1001    };
1002
1003    // Limit.
1004    let limit_usize = usize::try_from(limit.max(1)).unwrap_or(1000);
1005    let has_more = filtered.len() > limit_usize;
1006    filtered.truncate(limit_usize);
1007    let next_cursor = if has_more {
1008        filtered
1009            .last()
1010            .and_then(|e| e.get("pubkey"))
1011            .and_then(Value::as_str)
1012            .map(String::from)
1013    } else {
1014        None
1015    };
1016
1017    match next_cursor {
1018        Some(c) => json!({ "items": filtered, "cursor": c }),
1019        None => json!({ "items": filtered }),
1020    }
1021}
1022
1023/// `helius.tx.sendTransactionWithSender`.
1024///
1025/// Real Helius routes the tx through its parallel Jito-relay fleet
1026/// for faster landing. Locally we can't reproduce the fleet, so we
1027/// shim by forwarding the tx to the upstream's plain
1028/// `sendTransaction`. Callers get a signature back; inclusion latency
1029/// is whatever the local validator produces.
1030///
1031/// Params mirror `sendTransaction` and forward untouched. Helius-
1032/// specific knobs like `skipPreflight` or Jito-tip addresses flow
1033/// through to Surfpool, which either supports or silently ignores them.
1034async fn handle_send_transaction_with_sender<S, C, U>(
1035    ctx: &Ctx<S, C, U>,
1036    req: &JsonRpcRequest,
1037) -> Value
1038where
1039    S: CnftStore + ?Sized,
1040    C: CacheStore + ?Sized,
1041    U: UpstreamClient + ?Sized + 'static,
1042{
1043    match ctx
1044        .upstream
1045        .rpc_call("sendTransaction", req.params.clone())
1046        .await
1047    {
1048        Ok(raw) => {
1049            let result: Value = serde_json::from_slice(&raw).unwrap_or(Value::Null);
1050            ok(&req.id, result)
1051        }
1052        Err(e) => fail(
1053            &req.id,
1054            codes::INTERNAL_ERROR,
1055            format!("upstream sendTransaction failed: {e}"),
1056        ),
1057    }
1058}
1059
1060/// `helius.tx.getPriorityFeeEstimate` — computes percentiles locally
1061/// over `getRecentPrioritizationFees` samples. On Surfpool (local, no
1062/// contention) the upstream returns an empty array and every level is
1063/// 0, which is the correct answer for a no-contention environment.
1064///
1065/// Supports both response shapes:
1066/// - `includeAllPriorityFeeLevels: true` → `{ priorityFeeLevels: {...} }`
1067/// - otherwise → `{ priorityFeeEstimate: <single number> }` using the
1068///   requested `priorityLevel` (defaults to `medium`).
1069async fn handle_get_priority_fee_estimate<S, C, U>(
1070    ctx: &Ctx<S, C, U>,
1071    req: &JsonRpcRequest,
1072) -> Value
1073where
1074    S: CnftStore + ?Sized,
1075    C: CacheStore + ?Sized,
1076    U: UpstreamClient + ?Sized + 'static,
1077{
1078    // Helius accepts params as [{ accountKeys, options }]; also tolerant
1079    // of the bare object form { accountKeys, options }.
1080    let params_obj = match &req.params {
1081        Value::Array(a) => a.first().cloned().unwrap_or(Value::Null),
1082        other => other.clone(),
1083    };
1084    let account_keys: Vec<String> = params_obj
1085        .get("accountKeys")
1086        .and_then(Value::as_array)
1087        .map(|arr| {
1088            arr.iter()
1089                .filter_map(|v| v.as_str().map(String::from))
1090                .collect()
1091        })
1092        .unwrap_or_default();
1093    let options = params_obj.get("options").cloned().unwrap_or(Value::Null);
1094    let include_all = options
1095        .get("includeAllPriorityFeeLevels")
1096        .and_then(Value::as_bool)
1097        .unwrap_or(false);
1098
1099    // Fetch recent prioritization fees via the upstream. Surfpool (no
1100    // contention) returns []; real devnet/mainnet returns up to 150
1101    // samples.
1102    let upstream_params = if account_keys.is_empty() {
1103        json!([])
1104    } else {
1105        json!([account_keys])
1106    };
1107    let raw = match ctx
1108        .upstream
1109        .rpc_call("getRecentPrioritizationFees", upstream_params)
1110        .await
1111    {
1112        Ok(r) => r,
1113        Err(e) => {
1114            return fail(
1115                &req.id,
1116                codes::INTERNAL_ERROR,
1117                format!("upstream getRecentPrioritizationFees failed: {e}"),
1118            );
1119        }
1120    };
1121    // Result shape: [{ slot: u64, prioritizationFee: u64 }, ...]
1122    let fees: Vec<u64> = serde_json::from_slice::<Value>(&raw)
1123        .ok()
1124        .and_then(|v| v.as_array().cloned())
1125        .map(|arr| {
1126            arr.iter()
1127                .filter_map(|entry| entry.get("prioritizationFee").and_then(Value::as_u64))
1128                .collect()
1129        })
1130        .unwrap_or_default();
1131
1132    let levels = compute_levels(&fees);
1133
1134    if include_all {
1135        ok(
1136            &req.id,
1137            json!({
1138                "priorityFeeLevels": levels,
1139            }),
1140        )
1141    } else {
1142        // Single-level response path. Parse priorityLevel ("medium" by
1143        // default) and resolve against the sorted distribution.
1144        let level: PriorityLevel = options
1145            .get("priorityLevel")
1146            .and_then(Value::as_str)
1147            .and_then(|s| match s {
1148                "min" | "Min" => Some(PriorityLevel::Min),
1149                "low" | "Low" => Some(PriorityLevel::Low),
1150                "medium" | "Medium" => Some(PriorityLevel::Medium),
1151                "high" | "High" => Some(PriorityLevel::High),
1152                "veryHigh" | "VeryHigh" => Some(PriorityLevel::VeryHigh),
1153                "unsafeMax" | "UnsafeMax" => Some(PriorityLevel::UnsafeMax),
1154                _ => None,
1155            })
1156            .unwrap_or(PriorityLevel::Medium);
1157        let mut sorted = fees;
1158        sorted.sort_unstable();
1159        let estimate = percentile_at(&sorted, level);
1160        ok(
1161            &req.id,
1162            json!({
1163                "priorityFeeEstimate": estimate,
1164            }),
1165        )
1166    }
1167}
1168
1169// `async` for symmetry with the other handler signatures — `dispatch`
1170// awaits every variant uniformly. Pure fn-equivalent handlers don't
1171// need to await anything.
1172#[allow(clippy::unused_async)]
1173async fn handle_tidepool_info<S, C, U>(_ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
1174where
1175    S: CnftStore + ?Sized,
1176    C: CacheStore + ?Sized,
1177    U: UpstreamClient + ?Sized + 'static,
1178{
1179    let methods = manifest();
1180    let summary = summarize(methods);
1181    ok(
1182        &req.id,
1183        json!({
1184            "name": "tidepool",
1185            "version": env!("CARGO_PKG_VERSION"),
1186            "methods": methods,
1187            "summary": summary,
1188            // Upstream pins this release was tested against. Parsed
1189            // at compile time from `compatibility.toml` — see
1190            // `crates/service/src/compatibility.rs`.
1191            "compatibility": compatibility(),
1192        }),
1193    )
1194}
1195
1196async fn handle_tidepool_index_tree<S, C, U>(ctx: &Ctx<S, C, U>, req: &JsonRpcRequest) -> Value
1197where
1198    S: CnftStore + ?Sized,
1199    C: CacheStore + ?Sized,
1200    U: UpstreamClient + ?Sized + 'static,
1201{
1202    let Some(tree_b58) = req.params.get("tree").and_then(Value::as_str) else {
1203        return fail(&req.id, codes::INVALID_PARAMS, "missing `tree` param");
1204    };
1205    let Some(tree_bytes) = bs58_to_32(tree_b58) else {
1206        return fail(
1207            &req.id,
1208            codes::INVALID_PARAMS,
1209            "`tree` is not a valid 32-byte base58 address",
1210        );
1211    };
1212    let opts = IndexTreeOptions::default();
1213    match index_tree(&*ctx.upstream, &*ctx.cnft, tree_bytes, &opts).await {
1214        Ok(result) => ok(
1215            &req.id,
1216            json!({
1217                "tree": tree_b58,
1218                "processed": result.processed,
1219                "applied": result.applied,
1220                "skipped": result.skipped,
1221            }),
1222        ),
1223        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("Index failed: {e}")),
1224    }
1225}
1226
1227/// `tidepool_exportTreeSnapshot` — export one tree's indexed state as
1228/// a wire envelope the caller can save + later feed back via
1229/// `tidepool_loadTreeSnapshot` or the CLI's `--snapshot` flag.
1230/// Returns `null` when the tree isn't registered.
1231///
1232/// Shape mirrors Surfpool's `surfnet_exportSnapshot` but scoped to
1233/// cNFT tree state (our data model, not SVM accounts).
1234async fn handle_tidepool_export_tree_snapshot<S, C, U>(
1235    ctx: &Ctx<S, C, U>,
1236    req: &JsonRpcRequest,
1237) -> Value
1238where
1239    S: CnftStore + ?Sized,
1240    C: CacheStore + ?Sized,
1241    U: UpstreamClient + ?Sized + 'static,
1242{
1243    let Some(tree_b58) = req.params.get("tree").and_then(Value::as_str) else {
1244        return fail(&req.id, codes::INVALID_PARAMS, "missing `tree` param");
1245    };
1246    let Some(tree_bytes) = bs58_to_32(tree_b58) else {
1247        return fail(
1248            &req.id,
1249            codes::INVALID_PARAMS,
1250            "`tree` is not a valid 32-byte base58 address",
1251        );
1252    };
1253    match tidepool_rpc::cnft::dump_tree(&*ctx.cnft, &tree_bytes).await {
1254        Ok(Some(snapshot)) => {
1255            let blob = tidepool_rpc::cnft::SnapshotBlob::from_tree(&snapshot);
1256            ok(
1257                &req.id,
1258                json!({
1259                    "tree": tree_b58,
1260                    "leafCount": snapshot.leaves.len(),
1261                    "lastSignature": snapshot.last_signature,
1262                    "snapshot": blob,
1263                }),
1264            )
1265        }
1266        Ok(None) => ok(&req.id, Value::Null),
1267        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("dump failed: {e}")),
1268    }
1269}
1270
1271/// `tidepool_loadTreeSnapshot` — apply a previously-exported snapshot
1272/// to the local store. Overwrites any existing state for the tree.
1273async fn handle_tidepool_load_tree_snapshot<S, C, U>(
1274    ctx: &Ctx<S, C, U>,
1275    req: &JsonRpcRequest,
1276) -> Value
1277where
1278    S: CnftStore + ?Sized,
1279    C: CacheStore + ?Sized,
1280    U: UpstreamClient + ?Sized + 'static,
1281{
1282    let Some(snapshot_v) = req.params.get("snapshot") else {
1283        return fail(&req.id, codes::INVALID_PARAMS, "missing `snapshot` param");
1284    };
1285    let blob: tidepool_rpc::cnft::SnapshotBlob = match serde_json::from_value(snapshot_v.clone()) {
1286        Ok(b) => b,
1287        Err(e) => {
1288            return fail(
1289                &req.id,
1290                codes::INVALID_PARAMS,
1291                format!("snapshot envelope: {e}"),
1292            )
1293        }
1294    };
1295    let snapshot = match blob.into_tree_snapshot() {
1296        Ok(s) => s,
1297        Err(e) => return fail(&req.id, codes::INVALID_PARAMS, e),
1298    };
1299    match tidepool_rpc::cnft::load_tree(&*ctx.cnft, snapshot).await {
1300        Ok(summary) => ok(
1301            &req.id,
1302            json!({
1303                "tree": bs58::encode(summary.tree).into_string(),
1304                "leafCount": summary.leaf_count,
1305            }),
1306        ),
1307        Err(e) => fail(&req.id, codes::INTERNAL_ERROR, format!("load failed: {e}")),
1308    }
1309}
1310
1311// ─── param helpers ────────────────────────────────────────────────
1312
1313fn extract_id_param(params: &Value) -> Option<String> {
1314    // Accept both `{ "id": "..." }` and `[...]` positional forms.
1315    if let Some(id) = params.get("id").and_then(Value::as_str) {
1316        return Some(id.to_string());
1317    }
1318    if let Some(arr) = params.as_array() {
1319        if let Some(id) = arr.first().and_then(Value::as_str) {
1320            return Some(id.to_string());
1321        }
1322    }
1323    None
1324}
1325
1326fn bs58_to_32(s: &str) -> Option<[u8; 32]> {
1327    let v = bs58::decode(s).into_vec().ok()?;
1328    v.try_into().ok()
1329}