Skip to main content

tidepool_rpc/
upstream.rs

1//! Upstream-client trait and a deterministic fixture impl.
2//!
3//! The trait is minimal: one generic `rpc_call` and a typed
4//! `get_account` convenience. Everything the indexer and DAS handlers
5//! need goes through these two methods. A network-backed
6//! implementation (lifted onto `solana-client` crate) lands alongside
7//! the server crate — for the service layer we only depend on the
8//! trait + the fixture impl for tests.
9
10use std::collections::HashMap;
11
12use async_trait::async_trait;
13use thiserror::Error;
14
15/// Raw on-chain account as the indexer and decoders consume it.
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct AccountData {
18    pub data: Vec<u8>,
19    pub owner: [u8; 32],
20    pub lamports: u64,
21}
22
23/// Errors surfaceable by any `UpstreamClient`. Network-backed impls
24/// wrap transport errors; the fixture impl only produces
25/// `MethodNotStubbed`.
26#[derive(Debug, Error)]
27pub enum UpstreamError {
28    #[error("upstream transport error: {0}")]
29    Transport(String),
30    #[error("upstream returned an RPC error: {0}")]
31    Rpc(String),
32    #[error("no fixture stub registered for RPC method '{method}'")]
33    MethodNotStubbed { method: String },
34    #[error("upstream request timed out after {millis}ms")]
35    Timeout { millis: u64 },
36}
37
38pub type UpstreamResult<T> = Result<T, UpstreamError>;
39
40/// An abstract Solana RPC client. `rpc_call` is the catch-all; typed
41/// conveniences sit on top of it where we care about ergonomics.
42#[async_trait]
43pub trait UpstreamClient: Send + Sync {
44    /// Invoke an arbitrary JSON-RPC method and return the `result`
45    /// field as raw JSON bytes. Higher layers deserialize per-method.
46    async fn rpc_call(&self, method: &str, params: serde_json::Value) -> UpstreamResult<Vec<u8>>;
47
48    /// Convenience: read an account by base58 pubkey. Network impl
49    /// goes through `getAccountInfo`; fixture impl reads from its map.
50    /// Returns `Ok(None)` when the account doesn't exist; `Err` for
51    /// transport failure.
52    async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>>;
53}
54
55/// Closure shape for a fixture RPC method producer. Named to keep the
56/// struct definition readable and to satisfy the `type_complexity`
57/// lint — `Box<dyn Fn(...) -> ...>` gets unwieldy quickly.
58type FixtureRpcHandler =
59    Box<dyn Fn(&serde_json::Value) -> UpstreamResult<serde_json::Value> + Send + Sync>;
60
61/// In-process canned upstream for tests. `rpc_responses` is a
62/// method-name → producer map; `accounts` is consulted by
63/// `get_account`. Producers close over owned state so tests can drive
64/// sequences deterministically without async plumbing.
65pub struct FixtureUpstream {
66    accounts: HashMap<String, AccountData>,
67    rpc_responses: HashMap<String, FixtureRpcHandler>,
68}
69
70impl FixtureUpstream {
71    #[must_use]
72    pub fn new() -> Self {
73        Self {
74            accounts: HashMap::new(),
75            rpc_responses: HashMap::new(),
76        }
77    }
78
79    /// Register an account under its base58 address. The same address
80    /// also satisfies a `getAccountInfo`-shaped `rpc_call` unless a
81    /// producer is registered explicitly for that method.
82    #[must_use]
83    pub fn with_account(mut self, address: impl Into<String>, data: AccountData) -> Self {
84        self.accounts.insert(address.into(), data);
85        self
86    }
87
88    /// Stub a JSON-RPC method with a producer closure that receives
89    /// the raw `params` value and returns the `result` value.
90    #[must_use]
91    pub fn with_method<F>(mut self, method: impl Into<String>, handler: F) -> Self
92    where
93        F: Fn(&serde_json::Value) -> UpstreamResult<serde_json::Value> + Send + Sync + 'static,
94    {
95        self.rpc_responses.insert(method.into(), Box::new(handler));
96        self
97    }
98}
99
100impl Default for FixtureUpstream {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106#[async_trait]
107impl UpstreamClient for FixtureUpstream {
108    async fn rpc_call(&self, method: &str, params: serde_json::Value) -> UpstreamResult<Vec<u8>> {
109        if let Some(handler) = self.rpc_responses.get(method) {
110            let value = handler(&params)?;
111            return serde_json::to_vec(&value)
112                .map_err(|e| UpstreamError::Transport(format!("serialize fixture result: {e}")));
113        }
114        Err(UpstreamError::MethodNotStubbed {
115            method: method.to_string(),
116        })
117    }
118
119    async fn get_account(&self, address: &str) -> UpstreamResult<Option<AccountData>> {
120        Ok(self.accounts.get(address).cloned())
121    }
122}