Skip to main content

tidepool_rpc/
upstream.rs

1//! Upstream-client trait and a deterministic fixture impl.
2//!
3//! The trait is minimal: one generic `rpc_call` and a typed
4//! `get_account` convenience. Everything the indexer and DAS handlers
5//! need goes through these two methods. A network-backed
6//! implementation (lifted onto `solana-client` crate) lands alongside
7//! the server crate — for the service layer we only depend on the
8//! trait + the fixture impl for tests.
9
10use std::collections::HashMap;
11
12use async_trait::async_trait;
13use thiserror::Error;
14
15/// Raw on-chain account as the indexer and decoders consume it.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct AccountData {
18    pub data: Vec<u8>,
19    pub owner: [u8; 32],
20    pub lamports: u64,
21}
22
23/// Errors surfaceable by any `UpstreamClient`. Network-backed impls
24/// wrap transport errors; the fixture impl only produces
25/// `MethodNotStubbed`.
26#[derive(Debug, Error)]
27pub enum UpstreamError {
28    #[error("upstream transport error: {0}")]
29    Transport(String),
30    #[error("upstream returned an RPC error: {0}")]
31    Rpc(String),
32    #[error("no fixture stub registered for RPC method '{method}'")]
33    MethodNotStubbed { method: String },
34    #[error("upstream request timed out after {millis}ms")]
35    Timeout { millis: u64 },
36}
37
38pub type UpstreamResult<T> = Result<T, UpstreamError>;
39
40/// An abstract Solana RPC client. `rpc_call` is the catch-all; typed
41/// conveniences sit on top of it where we care about ergonomics.
42#[async_trait]
43pub trait UpstreamClient: Send + Sync {
44    /// Invoke an arbitrary JSON-RPC method and return the `result`
45    /// field as raw JSON bytes. Higher layers deserialize per-method.
46    async fn rpc_call(&self, method: &str, params: serde_json::Value) -> UpstreamResult<Vec<u8>>;
47
48    /// Convenience: read an account by base58 pubkey. Network impl
49    /// goes through `getAccountInfo`; fixture impl reads from its map.
50    /// Returns `Ok(None)` when the account doesn't exist; `Err` for
51    /// transport failure.
52    async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>>;
53
54    /// Fetch arbitrary off-chain content by URI for DAS metadata
55    /// enrichment (the JSON an NFT's on-chain `uri` points at).
56    /// Implementations should support `http(s)://` and `file://`,
57    /// apply a timeout + size cap, and **fail soft** — returning
58    /// `None` on any error rather than propagating, so a blocked or
59    /// slow fetch degrades a `getAsset` response to its on-chain
60    /// fields instead of failing the whole call.
61    ///
62    /// The default returns `None` (no enrichment). The HTTP-backed
63    /// impl overrides it; fixture impls override it to serve canned
64    /// bytes for tests.
65    async fn fetch_uri(&self, _uri: &str) -> Option<Vec<u8>> {
66        None
67    }
68}
69
70/// Closure shape for a fixture RPC method producer. Named to keep the
71/// struct definition readable and to satisfy the `type_complexity`
72/// lint — `Box<dyn Fn(...) -> ...>` gets unwieldy quickly.
73type FixtureRpcHandler =
74    Box<dyn Fn(&serde_json::Value) -> UpstreamResult<serde_json::Value> + Send + Sync>;
75
76/// In-process canned upstream for tests. `rpc_responses` is a
77/// method-name → producer map; `accounts` is consulted by
78/// `get_account`. Producers close over owned state so tests can drive
79/// sequences deterministically without async plumbing.
80pub struct FixtureUpstream {
81    accounts: HashMap<String, AccountData>,
82    rpc_responses: HashMap<String, FixtureRpcHandler>,
83    offchain: HashMap<String, Vec<u8>>,
84}
85
86impl FixtureUpstream {
87    #[must_use]
88    pub fn new() -> Self {
89        Self {
90            accounts: HashMap::new(),
91            rpc_responses: HashMap::new(),
92            offchain: HashMap::new(),
93        }
94    }
95
96    /// Register canned off-chain content for a URI, consulted by
97    /// `fetch_uri`. Lets tests exercise metadata enrichment without
98    /// real network I/O.
99    #[must_use]
100    pub fn with_offchain(mut self, uri: impl Into<String>, body: impl Into<Vec<u8>>) -> Self {
101        self.offchain.insert(uri.into(), body.into());
102        self
103    }
104
105    /// Register an account under its base58 address. The same address
106    /// also satisfies a `getAccountInfo`-shaped `rpc_call` unless a
107    /// producer is registered explicitly for that method.
108    #[must_use]
109    pub fn with_account(mut self, address: impl Into<String>, data: AccountData) -> Self {
110        self.accounts.insert(address.into(), data);
111        self
112    }
113
114    /// Stub a JSON-RPC method with a producer closure that receives
115    /// the raw `params` value and returns the `result` value.
116    #[must_use]
117    pub fn with_method<F>(mut self, method: impl Into<String>, handler: F) -> Self
118    where
119        F: Fn(&serde_json::Value) -> UpstreamResult<serde_json::Value> + Send + Sync + 'static,
120    {
121        self.rpc_responses.insert(method.into(), Box::new(handler));
122        self
123    }
124}
125
126impl Default for FixtureUpstream {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132#[async_trait]
133impl UpstreamClient for FixtureUpstream {
134    async fn rpc_call(&self, method: &str, params: serde_json::Value) -> UpstreamResult<Vec<u8>> {
135        if let Some(handler) = self.rpc_responses.get(method) {
136            let value = handler(&params)?;
137            return serde_json::to_vec(&value)
138                .map_err(|e| UpstreamError::Transport(format!("serialize fixture result: {e}")));
139        }
140        Err(UpstreamError::MethodNotStubbed {
141            method: method.to_string(),
142        })
143    }
144
145    async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
146        Ok(self.accounts.get(address).cloned())
147    }
148
149    async fn fetch_uri(&self, uri: &str) -> Option<Vec<u8>> {
150        self.offchain.get(uri).cloned()
151    }
152}