1use std::net::SocketAddr;
10use std::sync::Arc;
11use std::time::Duration;
12
13use axum::{
14 extract::State,
15 http::StatusCode,
16 response::{IntoResponse, Response},
17 routing::post,
18 Json, Router,
19};
20use reqwest::Client;
21use serde_json::{json, Value};
22use tokio::net::TcpListener;
23use tower_http::cors::{Any, CorsLayer};
24use tracing::{error, info, warn};
25
26use tidepool_rpc::cache::{CacheStore, MemoryCache};
27use tidepool_rpc::cnft::{CnftStore, MemoryCnftStore, SqliteCnftStore};
28use tidepool_rpc::das::{AccountDecoder, MplCoreDecoder, TokenMetadataDecoder};
29use tidepool_rpc::sqlite_backend::SqliteBackend;
30use tidepool_rpc::sqlite_cache::SqliteCache;
31use tidepool_rpc::upstream::UpstreamClient;
32use tidepool_rpc::webhooks::{MemoryWebhookRegistry, SqliteWebhookRegistry, WebhookRegistry};
33
34use crate::config::ServerConfig;
35use crate::dispatcher::{dispatch, Ctx};
36use crate::json_rpc::{fail, JsonRpcRequest};
37use crate::upstream_http::HttpUpstream;
38
39#[allow(clippy::too_many_lines)]
42pub async fn run(config: ServerConfig) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
43 let (cnft, cache, webhook_registry): (
48 Arc<dyn CnftStore>,
49 Arc<dyn CacheStore>,
50 Arc<dyn WebhookRegistry>,
51 ) = if let Some(db) = &config.db {
52 info!("tidepool persisting state at {}", db.display());
53 let backend = SqliteBackend::open(db)?;
54 (
55 Arc::new(SqliteCnftStore::new(&backend)),
56 Arc::new(SqliteCache::new(&backend)),
57 Arc::new(SqliteWebhookRegistry::new(&backend).await?),
58 )
59 } else {
60 (
61 Arc::new(MemoryCnftStore::new()),
62 Arc::new(MemoryCache::new()),
63 Arc::new(MemoryWebhookRegistry::new()),
64 )
65 };
66
67 let upstream: Arc<dyn UpstreamClient> = Arc::new(
68 HttpUpstream::new(config.upstream_url.clone(), config.rpc_timeout)?
69 .with_offchain_metadata(config.offchain_metadata),
70 );
71 let decoders: Arc<[Arc<dyn AccountDecoder>]> = Arc::from(vec![
72 Arc::new(MplCoreDecoder) as Arc<dyn AccountDecoder>,
73 Arc::new(TokenMetadataDecoder) as Arc<dyn AccountDecoder>,
74 ]);
75
76 let poster: Arc<dyn tidepool_rpc::webhooks::PostClient> = Arc::new(
77 crate::webhook_runtime::ReqwestPostClient::new(config.rpc_timeout),
78 );
79 let webhooks = Arc::new(crate::webhook_runtime::WebhookRuntime::new(
80 webhook_registry,
81 Arc::clone(&upstream),
82 poster,
83 ));
84
85 let ctx = Ctx {
86 cnft,
87 cache,
88 upstream,
89 decoders,
90 webhooks,
91 };
92
93 for snap_path in &config.snapshots {
99 match std::fs::read(snap_path) {
100 Ok(bytes) => match serde_json::from_slice::<tidepool_rpc::cnft::SnapshotBlob>(&bytes) {
101 Ok(blob) => match blob.into_tree_snapshot() {
102 Ok(snapshot) => {
103 match tidepool_rpc::cnft::load_tree(&*ctx.cnft, snapshot).await {
104 Ok(summary) => info!(
105 path = %snap_path.display(),
106 tree = %bs58::encode(summary.tree).into_string(),
107 leaves = summary.leaf_count,
108 "loaded snapshot"
109 ),
110 Err(e) => {
111 warn!(path = %snap_path.display(), err = %e, "snapshot apply failed");
112 }
113 }
114 }
115 Err(e) => {
116 warn!(path = %snap_path.display(), err = %e, "snapshot decode failed");
117 }
118 },
119 Err(e) => warn!(path = %snap_path.display(), err = %e, "snapshot parse failed"),
120 },
121 Err(e) => warn!(path = %snap_path.display(), err = %e, "snapshot read failed"),
122 }
123 }
124
125 for tree in &config.index_trees {
128 let tree = tree.clone();
129 let ctx_clone = ctx.clone();
130 tokio::spawn(async move {
131 match bs58::decode(&tree).into_vec() {
132 Ok(v) if v.len() == 32 => {
133 let mut bytes = [0u8; 32];
134 bytes.copy_from_slice(&v);
135 let opts = tidepool_rpc::cnft::IndexTreeOptions::default();
136 match tidepool_rpc::cnft::index_tree(
137 &*ctx_clone.upstream,
138 &*ctx_clone.cnft,
139 bytes,
140 &opts,
141 )
142 .await
143 {
144 Ok(r) => info!(
145 tree = %tree,
146 processed = r.processed,
147 applied = r.applied,
148 "indexed tree"
149 ),
150 Err(e) => warn!(tree = %tree, err = %e, "failed to index tree"),
151 }
152 }
153 _ => warn!(tree = %tree, "invalid tree pubkey; skipping indexing"),
154 }
155 });
156 }
157
158 let upstream_url = config.upstream_url.clone();
159 let passthrough_client = Client::builder().timeout(config.rpc_timeout).build()?;
160
161 let state = AppState {
162 ctx,
163 passthrough_url: upstream_url,
164 passthrough_client,
165 };
166
167 let cors = CorsLayer::new()
168 .allow_origin(Any)
169 .allow_methods(Any)
170 .allow_headers(Any);
171
172 let rest_ctx = Arc::new(state.ctx.clone());
179
180 let app: Router = Router::new()
181 .route("/", post(handle_post))
182 .merge(crate::rest::router::<AppState>())
183 .layer(axum::Extension(rest_ctx))
184 .layer(cors)
185 .with_state(state);
186
187 let ws_port = config.ws_port.unwrap_or(config.port + 1);
193 let upstream_ws = config.upstream_ws_url.clone();
194 let ws_timeout = config.rpc_timeout;
195 tokio::spawn(async move {
196 if let Err(e) = crate::ws::run_ws(ws_port, upstream_ws, ws_timeout).await {
197 tracing::error!(err = %e, "ws server exited with error");
198 }
199 });
200
201 let addr = SocketAddr::from(([0, 0, 0, 0], config.port));
202 let listener = TcpListener::bind(&addr).await?;
203 info!("tidepool listening on http://{addr} (ws on :{ws_port})");
204 axum::serve(listener, app).await?;
205 Ok(())
206}
207
208#[derive(Clone)]
209struct AppState {
210 ctx: Ctx<dyn CnftStore, dyn CacheStore, dyn UpstreamClient>,
211 passthrough_url: String,
212 passthrough_client: Client,
213}
214
215async fn handle_post(State(state): State<AppState>, body: axum::body::Bytes) -> Response {
216 let Ok(req) = serde_json::from_slice::<JsonRpcRequest>(&body) else {
217 return passthrough(&state, &body).await;
221 };
222
223 match dispatch(&state.ctx, &req).await {
224 Some(response_json) => Json(response_json).into_response(),
225 None => passthrough(&state, &body).await,
226 }
227}
228
229async fn passthrough(state: &AppState, body: &axum::body::Bytes) -> Response {
230 match state
231 .passthrough_client
232 .post(&state.passthrough_url)
233 .header("content-type", "application/json")
234 .body(body.clone())
235 .send()
236 .await
237 {
238 Ok(upstream_resp) => {
239 let status = upstream_resp.status();
240 match upstream_resp.bytes().await {
241 Ok(bytes) => {
242 let mut resp = Response::new(axum::body::Body::from(bytes));
243 *resp.status_mut() = status;
244 resp.headers_mut().insert(
245 axum::http::header::CONTENT_TYPE,
246 axum::http::HeaderValue::from_static("application/json"),
247 );
248 resp
249 }
250 Err(e) => {
251 error!(err = %e, "failed to read upstream body");
252 failure_response(502, "Upstream body read failed")
253 }
254 }
255 }
256 Err(e) => {
257 error!(err = %e, "upstream unreachable");
258 failure_response(502, &format!("Surfpool unreachable: {e}"))
259 }
260 }
261}
262
263fn failure_response(status: u16, message: &str) -> Response {
264 let body = fail(
265 &Value::Null,
266 crate::json_rpc::codes::INTERNAL_ERROR,
267 message,
268 );
269 let mut resp = Json(body).into_response();
270 *resp.status_mut() = StatusCode::from_u16(status).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
271 resp
272}
273
274#[allow(dead_code)]
278fn _duration_use(_: Duration) {}
279
280#[allow(dead_code)]
283fn _json_use() -> Value {
284 json!(null)
285}