Skip to main content

tidepool_server/
http.rs

1//! axum HTTP server: JSON-RPC over POST, CORS, passthrough proxy for
2//! anything the dispatcher doesn't claim.
3//!
4//! The shape mirrors the TS version: one POST route, one upstream
5//! forward-path, full wildcard CORS. We intentionally don't bother
6//! with clever routing — the payload tells us which method we're
7//! handling.
8
9use std::net::SocketAddr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use axum::{
14    extract::State,
15    http::StatusCode,
16    response::{IntoResponse, Response},
17    routing::post,
18    Json, Router,
19};
20use reqwest::Client;
21use serde_json::{json, Value};
22use tokio::net::TcpListener;
23use tower_http::cors::{Any, CorsLayer};
24use tracing::{error, info, warn};
25
26use tidepool_rpc::cache::{CacheStore, MemoryCache};
27use tidepool_rpc::cnft::{CnftStore, MemoryCnftStore, SqliteCnftStore};
28use tidepool_rpc::das::{AccountDecoder, MplCoreDecoder, TokenMetadataDecoder};
29use tidepool_rpc::sqlite_backend::SqliteBackend;
30use tidepool_rpc::sqlite_cache::SqliteCache;
31use tidepool_rpc::upstream::UpstreamClient;
32use tidepool_rpc::webhooks::{MemoryWebhookRegistry, SqliteWebhookRegistry, WebhookRegistry};
33
34use crate::config::ServerConfig;
35use crate::dispatcher::{dispatch, Ctx};
36use crate::json_rpc::{fail, JsonRpcRequest};
37use crate::upstream_http::HttpUpstream;
38
39/// Serve the tidepool JSON-RPC API according to `config`. Blocks
40/// until the runtime shuts down.
41#[allow(clippy::too_many_lines)]
42pub async fn run(config: ServerConfig) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
43    // Pick persistence: SQLite (single file, Surfpool-style) when
44    // `db` is set to a path or `:memory:`-but-persistent isolate;
45    // plain in-memory otherwise. All three stores share one
46    // connection inside the backend.
47    let (cnft, cache, webhook_registry): (
48        Arc<dyn CnftStore>,
49        Arc<dyn CacheStore>,
50        Arc<dyn WebhookRegistry>,
51    ) = if let Some(db) = &config.db {
52        info!("tidepool persisting state at {}", db.display());
53        let backend = SqliteBackend::open(db)?;
54        (
55            Arc::new(SqliteCnftStore::new(&backend)),
56            Arc::new(SqliteCache::new(&backend)),
57            Arc::new(SqliteWebhookRegistry::new(&backend).await?),
58        )
59    } else {
60        (
61            Arc::new(MemoryCnftStore::new()),
62            Arc::new(MemoryCache::new()),
63            Arc::new(MemoryWebhookRegistry::new()),
64        )
65    };
66
67    let upstream: Arc<dyn UpstreamClient> = Arc::new(HttpUpstream::new(
68        config.upstream_url.clone(),
69        config.rpc_timeout,
70    )?);
71    let decoders: Arc<[Arc<dyn AccountDecoder>]> = Arc::from(vec![
72        Arc::new(MplCoreDecoder) as Arc<dyn AccountDecoder>,
73        Arc::new(TokenMetadataDecoder) as Arc<dyn AccountDecoder>,
74    ]);
75
76    let poster: Arc<dyn tidepool_rpc::webhooks::PostClient> = Arc::new(
77        crate::webhook_runtime::ReqwestPostClient::new(config.rpc_timeout),
78    );
79    let webhooks = Arc::new(crate::webhook_runtime::WebhookRuntime::new(
80        webhook_registry,
81        Arc::clone(&upstream),
82        poster,
83    ));
84
85    let ctx = Ctx {
86        cnft,
87        cache,
88        upstream,
89        decoders,
90        webhooks,
91    };
92
93    // Snapshot preload (--snapshot). Runs synchronously before the
94    // HTTP server starts binding so that by the time requests flow
95    // in, `getAssetProof` etc. can already answer against the loaded
96    // trees. Errors log + continue — a bad snapshot shouldn't wedge
97    // the whole server.
98    for snap_path in &config.snapshots {
99        match std::fs::read(snap_path) {
100            Ok(bytes) => match serde_json::from_slice::<tidepool_rpc::cnft::SnapshotBlob>(&bytes) {
101                Ok(blob) => match blob.into_tree_snapshot() {
102                    Ok(snapshot) => {
103                        match tidepool_rpc::cnft::load_tree(&*ctx.cnft, snapshot).await {
104                            Ok(summary) => info!(
105                                path = %snap_path.display(),
106                                tree = %bs58::encode(summary.tree).into_string(),
107                                leaves = summary.leaf_count,
108                                "loaded snapshot"
109                            ),
110                            Err(e) => {
111                                warn!(path = %snap_path.display(), err = %e, "snapshot apply failed");
112                            }
113                        }
114                    }
115                    Err(e) => {
116                        warn!(path = %snap_path.display(), err = %e, "snapshot decode failed");
117                    }
118                },
119                Err(e) => warn!(path = %snap_path.display(), err = %e, "snapshot parse failed"),
120            },
121            Err(e) => warn!(path = %snap_path.display(), err = %e, "snapshot read failed"),
122        }
123    }
124
125    // Background tree backfill (non-blocking). Failures are logged
126    // and don't prevent the server from starting.
127    for tree in &config.index_trees {
128        let tree = tree.clone();
129        let ctx_clone = ctx.clone();
130        tokio::spawn(async move {
131            match bs58::decode(&tree).into_vec() {
132                Ok(v) if v.len() == 32 => {
133                    let mut bytes = [0u8; 32];
134                    bytes.copy_from_slice(&v);
135                    let opts = tidepool_rpc::cnft::IndexTreeOptions::default();
136                    match tidepool_rpc::cnft::index_tree(
137                        &*ctx_clone.upstream,
138                        &*ctx_clone.cnft,
139                        bytes,
140                        &opts,
141                    )
142                    .await
143                    {
144                        Ok(r) => info!(
145                            tree = %tree,
146                            processed = r.processed,
147                            applied = r.applied,
148                            "indexed tree"
149                        ),
150                        Err(e) => warn!(tree = %tree, err = %e, "failed to index tree"),
151                    }
152                }
153                _ => warn!(tree = %tree, "invalid tree pubkey; skipping indexing"),
154            }
155        });
156    }
157
158    let upstream_url = config.upstream_url.clone();
159    let passthrough_client = Client::builder().timeout(config.rpc_timeout).build()?;
160
161    let state = AppState {
162        ctx,
163        passthrough_url: upstream_url,
164        passthrough_client,
165    };
166
167    let cors = CorsLayer::new()
168        .allow_origin(Any)
169        .allow_methods(Any)
170        .allow_headers(Any);
171
172    // REST layer — mirrors the paths helius-sdk hits on
173    // `api.helius.xyz/v0/...`. Mounted on the same axum server so a
174    // user points their SDK at http://localhost:<port> and gets both
175    // JSON-RPC (POST /) and REST (/v0/*) transports from one place.
176    // Ctx is injected via Extension so the REST router stays state-
177    // type-agnostic and composes cleanly with the typed-state parent.
178    let rest_ctx = Arc::new(state.ctx.clone());
179
180    let app: Router = Router::new()
181        .route("/", post(handle_post))
182        .merge(crate::rest::router::<AppState>())
183        .layer(axum::Extension(rest_ctx))
184        .layer(cors)
185        .with_state(state);
186
187    // WS reverse proxy. Defaults to `port + 1` when ws_port isn't
188    // explicitly set — production CLI shape. Tests pre-bind both
189    // ports and pass them explicitly to dodge parallel races.
190    // Forwards every connection to `upstream_ws_url`; uses
191    // `rpc_timeout` as the upstream-dial timeout.
192    let ws_port = config.ws_port.unwrap_or(config.port + 1);
193    let upstream_ws = config.upstream_ws_url.clone();
194    let ws_timeout = config.rpc_timeout;
195    tokio::spawn(async move {
196        if let Err(e) = crate::ws::run_ws(ws_port, upstream_ws, ws_timeout).await {
197            tracing::error!(err = %e, "ws server exited with error");
198        }
199    });
200
201    let addr = SocketAddr::from(([0, 0, 0, 0], config.port));
202    let listener = TcpListener::bind(&addr).await?;
203    info!("tidepool listening on http://{addr} (ws on :{ws_port})");
204    axum::serve(listener, app).await?;
205    Ok(())
206}
207
208#[derive(Clone)]
209struct AppState {
210    ctx: Ctx<dyn CnftStore, dyn CacheStore, dyn UpstreamClient>,
211    passthrough_url: String,
212    passthrough_client: Client,
213}
214
215async fn handle_post(State(state): State<AppState>, body: axum::body::Bytes) -> Response {
216    let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) else {
217        // Forward malformed JSON to upstream unchanged — Surfpool's
218        // own error becomes the user-visible error, matching TS
219        // behavior.
220        return passthrough(&state, &body).await;
221    };
222
223    match dispatch(&state.ctx, &req).await {
224        Some(response_json) => Json(response_json).into_response(),
225        None => passthrough(&state, &body).await,
226    }
227}
228
229async fn passthrough(state: &AppState, body: &axum::body::Bytes) -> Response {
230    match state
231        .passthrough_client
232        .post(&state.passthrough_url)
233        .header("content-type", "application/json")
234        .body(body.clone())
235        .send()
236        .await
237    {
238        Ok(upstream_resp) => {
239            let status = upstream_resp.status();
240            match upstream_resp.bytes().await {
241                Ok(bytes) => {
242                    let mut resp = Response::new(axum::body::Body::from(bytes));
243                    *resp.status_mut() = status;
244                    resp.headers_mut().insert(
245                        axum::http::header::CONTENT_TYPE,
246                        axum::http::HeaderValue::from_static("application/json"),
247                    );
248                    resp
249                }
250                Err(e) => {
251                    error!(err = %e, "failed to read upstream body");
252                    failure_response(502, "Upstream body read failed")
253                }
254            }
255        }
256        Err(e) => {
257            error!(err = %e, "upstream unreachable");
258            failure_response(502, &format!("Surfpool unreachable: {e}"))
259        }
260    }
261}
262
263fn failure_response(status: u16, message: &str) -> Response {
264    let body = fail(
265        &Value::Null,
266        crate::json_rpc::codes::INTERNAL_ERROR,
267        message,
268    );
269    let mut resp = Json(body).into_response();
270    *resp.status_mut() = StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
271    resp
272}
273
274// Silence the unused-import warning for Duration on Rust versions
275// that don't eliminate it transitively. The real use is in
276// `ServerConfig::rpc_timeout` via reqwest.
277#[allow(dead_code)]
278fn _duration_use(_: Duration) {}
279
280// Silence unused-json! warning when dispatcher isn't compiled with
281// certain handler variants (future-proofing).
282#[allow(dead_code)]
283fn _json_use() -> Value {
284    json!(null)
285}