use std::{net::SocketAddr, str::FromStr};
use std::sync::Arc;
use axum::{
extract::State,
http::StatusCode,
response::IntoResponse,
routing::post,
Json,
Router,
};
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use solana_sdk::pubkey::Pubkey;
use super::store::{StoreError, TapeStore};
#[repr(i64)]
#[derive(Copy, Clone)]
pub enum ErrorCode {
ParseError = -32700,
InvalidRequest = -32600,
MethodNotFound = -32601,
InvalidParams = -32602,
InternalError = -32603,
ServerError = -32000,
}
impl ErrorCode {
pub fn code(self) -> i64 {
self as i64
}
}
#[derive(Deserialize)]
struct RpcRequest {
method: String,
params: Value,
id: Option<Value>,
}
#[derive(Serialize)]
pub struct RpcError {
code: i64,
message: String,
}
#[derive(Serialize)]
struct RpcResponse {
jsonrpc: String,
#[serde(skip_serializing_if = "Option::is_none")]
result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
error: Option<RpcError>,
id: Option<Value>,
}
fn make_response(
id: Option<Value>,
result: Result<Value, RpcError>,
) -> (StatusCode, Json<RpcResponse>) {
let (res, err) = match result {
Ok(val) => (Some(val), None),
Err(e) => (None, Some(e)),
};
let resp = RpcResponse {
jsonrpc: "2.0".into(),
result: res,
error: err,
id,
};
(StatusCode::OK, Json(resp))
}
pub fn rpc_get_health(store: &TapeStore, _params: &Value) -> Result<Value, RpcError> {
let (last_processed_slot, drift) = store
.get_health()
.map_err(|e| RpcError {
code: ErrorCode::ServerError.code(),
message: e.to_string(),
})?;
Ok(json!({ "last_processed_slot": last_processed_slot, "drift": drift }))
}
pub fn rpc_get_tape_address(store: &TapeStore, params: &Value) -> Result<Value, RpcError> {
let tn = params
.get("tape_number")
.and_then(Value::as_u64)
.ok_or(RpcError {
code: ErrorCode::InvalidParams.code(),
message: "invalid or missing tape_number".into(),
})?;
store
.get_tape_address(tn)
.map(|pk| json!(pk.to_string()))
.map_err(|e| match e {
StoreError::TapeNotFound(n) => RpcError {
code: ErrorCode::ServerError.code(),
message: format!("tape {} not found", n),
},
other => RpcError {
code: ErrorCode::ServerError.code(),
message: other.to_string(),
},
})
}
pub fn rpc_get_tape_number(store: &TapeStore, params: &Value) -> Result<Value, RpcError> {
let addr = params
.get("tape_address")
.and_then(Value::as_str)
.ok_or(RpcError {
code: ErrorCode::InvalidParams.code(),
message: "invalid or missing tape_address".into(),
})?;
let pk = Pubkey::from_str(addr).map_err(|e| RpcError {
code: ErrorCode::InvalidParams.code(),
message: format!("invalid pubkey: {}", e),
})?;
store
.get_tape_number(&pk)
.map(|num| json!(num))
.map_err(|e| match e {
StoreError::TapeNotFoundForAddress(_) => RpcError {
code: ErrorCode::ServerError.code(),
message: "tape not found for address".into(),
},
other => RpcError {
code: ErrorCode::ServerError.code(),
message: other.to_string(),
},
})
}
pub fn rpc_get_segment(store: &TapeStore, params: &Value) -> Result<Value, RpcError> {
let addr = params
.get("tape_address")
.and_then(Value::as_str)
.ok_or(RpcError {
code: ErrorCode::InvalidParams.code(),
message: "invalid or missing tape_address".into(),
})?;
let sn = params
.get("segment_number")
.and_then(Value::as_u64)
.ok_or(RpcError {
code: ErrorCode::InvalidParams.code(),
message: "invalid or missing segment_number".into(),
})?;
let pk = Pubkey::from_str(addr).map_err(|e| RpcError {
code: ErrorCode::InvalidParams.code(),
message: format!("invalid pubkey: {}", e),
})?;
store
.get_segment(&pk, sn)
.map(|data| json!(base64::encode(data)))
.map_err(|e| match e {
StoreError::SegmentNotFound(_, num) => RpcError {
code: ErrorCode::ServerError.code(),
message: format!("segment {} not found", num),
},
other => RpcError {
code: ErrorCode::ServerError.code(),
message: other.to_string(),
},
})
}
pub fn rpc_get_tape(store: &TapeStore, params: &Value) -> Result<Value, RpcError> {
let addr = params
.get("tape_address")
.and_then(Value::as_str)
.ok_or(RpcError {
code: ErrorCode::InvalidParams.code(),
message: "invalid or missing tape_address".into(),
})?;
let pk = Pubkey::from_str(addr).map_err(|e| RpcError {
code: ErrorCode::InvalidParams.code(),
message: format!("invalid pubkey: {}", e),
})?;
let segments = store.get_tape_segments(&pk).map_err(|e| RpcError {
code: ErrorCode::ServerError.code(),
message: e.to_string(),
})?;
let arr: Vec<Value> = segments
.into_iter()
.map(|(num, data)| {
json!({
"segment_number": num,
"data": base64::encode(data),
})
})
.collect();
Ok(json!(arr))
}
async fn rpc_handler(
State(store): State<Arc<TapeStore>>,
Json(req): Json<RpcRequest>,
) -> impl IntoResponse {
let id = req.id.clone();
let outcome = match req.method.as_str() {
"getHealth" => rpc_get_health(&store, &req.params),
"getTapeAddress" => rpc_get_tape_address(&store, &req.params),
"getTapeNumber" => rpc_get_tape_number(&store, &req.params),
"getSegment" => rpc_get_segment(&store, &req.params),
"getTape" => rpc_get_tape(&store, &req.params),
_ => Err(RpcError {
code: ErrorCode::MethodNotFound.code(),
message: "method not found".into(),
}),
};
make_response(id, outcome)
}
pub async fn web_loop(
store: TapeStore,
port: u16,
) -> anyhow::Result<()> {
let store = Arc::new(store);
{
let store = Arc::clone(&store);
tokio::spawn(async move {
let interval = std::time::Duration::from_secs(15);
loop {
store.catch_up_with_primary().unwrap();
tokio::time::sleep(interval).await;
}
});
}
let app = Router::new()
.route("/api", post(rpc_handler))
.with_state(store);
let addr = SocketAddr::from(([127, 0, 0, 1], port));
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?;
Ok(())
}