tidepool_rpc/upstream.rs
1//! Upstream-client trait and a deterministic fixture impl.
2//!
3//! The trait is minimal: one generic `rpc_call` and a typed
4//! `get_account` convenience. Everything the indexer and DAS handlers
5//! need goes through these two methods. A network-backed
6//! implementation (lifted onto `solana-client` crate) lands alongside
7//! the server crate — for the service layer we only depend on the
8//! trait + the fixture impl for tests.
9
10use std::collections::HashMap;
11
12use async_trait::async_trait;
13use thiserror::Error;
14
15/// Raw on-chain account as the indexer and decoders consume it.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct AccountData {
18 pub data: Vec<u8>,
19 pub owner: [u8; 32],
20 pub lamports: u64,
21}
22
23/// Errors surfaceable by any `UpstreamClient`. Network-backed impls
24/// wrap transport errors; the fixture impl only produces
25/// `MethodNotStubbed`.
26#[derive(Debug, Error)]
27pub enum UpstreamError {
28 #[error("upstream transport error: {0}")]
29 Transport(String),
30 #[error("upstream returned an RPC error: {0}")]
31 Rpc(String),
32 #[error("no fixture stub registered for RPC method '{method}'")]
33 MethodNotStubbed { method: String },
34 #[error("upstream request timed out after {millis}ms")]
35 Timeout { millis: u64 },
36}
37
38pub type UpstreamResult<T> = Result<T, UpstreamError>;
39
40/// An abstract Solana RPC client. `rpc_call` is the catch-all; typed
41/// conveniences sit on top of it where we care about ergonomics.
42#[async_trait]
43pub trait UpstreamClient: Send + Sync {
44 /// Invoke an arbitrary JSON-RPC method and return the `result`
45 /// field as raw JSON bytes. Higher layers deserialize per-method.
46 async fn rpc_call(&self, method: &str, params: serde_json::Value) -> UpstreamResult<Vec<u8>>;
47
48 /// Convenience: read an account by base58 pubkey. Network impl
49 /// goes through `getAccountInfo`; fixture impl reads from its map.
50 /// Returns `Ok(None)` when the account doesn't exist; `Err` for
51 /// transport failure.
52 async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>>;
53
54 /// Fetch arbitrary off-chain content by URI for DAS metadata
55 /// enrichment (the JSON an NFT's on-chain `uri` points at).
56 /// Implementations should support `http(s)://` and `file://`,
57 /// apply a timeout + size cap, and **fail soft** — returning
58 /// `None` on any error rather than propagating, so a blocked or
59 /// slow fetch degrades a `getAsset` response to its on-chain
60 /// fields instead of failing the whole call.
61 ///
62 /// The default returns `None` (no enrichment). The HTTP-backed
63 /// impl overrides it; fixture impls override it to serve canned
64 /// bytes for tests.
65 async fn fetch_uri(&self, _uri: &str) -> Option<Vec<u8>> {
66 None
67 }
68}
69
70/// Closure shape for a fixture RPC method producer. Named to keep the
71/// struct definition readable and to satisfy the `type_complexity`
72/// lint — `Box<dyn Fn(...) -> ...>` gets unwieldy quickly.
73type FixtureRpcHandler =
74 Box<dyn Fn(&serde_json::Value) -> UpstreamResult<serde_json::Value> + Send + Sync>;
75
76/// In-process canned upstream for tests. `rpc_responses` is a
77/// method-name → producer map; `accounts` is consulted by
78/// `get_account`. Producers close over owned state so tests can drive
79/// sequences deterministically without async plumbing.
80pub struct FixtureUpstream {
81 accounts: HashMap<String, AccountData>,
82 rpc_responses: HashMap<String, FixtureRpcHandler>,
83 offchain: HashMap<String, Vec<u8>>,
84}
85
86impl FixtureUpstream {
87 #[must_use]
88 pub fn new() -> Self {
89 Self {
90 accounts: HashMap::new(),
91 rpc_responses: HashMap::new(),
92 offchain: HashMap::new(),
93 }
94 }
95
96 /// Register canned off-chain content for a URI, consulted by
97 /// `fetch_uri`. Lets tests exercise metadata enrichment without
98 /// real network I/O.
99 #[must_use]
100 pub fn with_offchain(mut self, uri: impl Into<String>, body: impl Into<Vec<u8>>) -> Self {
101 self.offchain.insert(uri.into(), body.into());
102 self
103 }
104
105 /// Register an account under its base58 address. The same address
106 /// also satisfies a `getAccountInfo`-shaped `rpc_call` unless a
107 /// producer is registered explicitly for that method.
108 #[must_use]
109 pub fn with_account(mut self, address: impl Into<String>, data: AccountData) -> Self {
110 self.accounts.insert(address.into(), data);
111 self
112 }
113
114 /// Stub a JSON-RPC method with a producer closure that receives
115 /// the raw `params` value and returns the `result` value.
116 #[must_use]
117 pub fn with_method<F>(mut self, method: impl Into<String>, handler: F) -> Self
118 where
119 F: Fn(&serde_json::Value) -> UpstreamResult<serde_json::Value> + Send + Sync + 'static,
120 {
121 self.rpc_responses.insert(method.into(), Box::new(handler));
122 self
123 }
124}
125
126impl Default for FixtureUpstream {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132#[async_trait]
133impl UpstreamClient for FixtureUpstream {
134 async fn rpc_call(&self, method: &str, params: serde_json::Value) -> UpstreamResult<Vec<u8>> {
135 if let Some(handler) = self.rpc_responses.get(method) {
136 let value = handler(¶ms)?;
137 return serde_json::to_vec(&value)
138 .map_err(|e| UpstreamError::Transport(format!("serialize fixture result: {e}")));
139 }
140 Err(UpstreamError::MethodNotStubbed {
141 method: method.to_string(),
142 })
143 }
144
145 async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
146 Ok(self.accounts.get(address).cloned())
147 }
148
149 async fn fetch_uri(&self, uri: &str) -> Option<Vec<u8>> {
150 self.offchain.get(uri).cloned()
151 }
152}