use crate::config::{Config, RpcServerAuth};
use crate::db::Database;
use crate::error::{Error, Result};
use crate::pool::PoolManager;
use crate::stats::Stats;
use axum::{
extract::{ConnectInfo, State},
http::{header, HeaderMap, StatusCode},
response::{IntoResponse, Response},
routing::{get, post},
Json, Router,
};
use base64::{engine::general_purpose::STANDARD, Engine as _};
use log::{debug, error, info, warn};
use pocx_protocol::{
JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse, MiningInfo, SubmitNonceParams,
SubmitNonceResult, METHOD_GET_MINING_INFO, METHOD_SUBMIT_NONCE,
};
use serde_json::Value;
use std::net::SocketAddr;
pub struct AggregatorServer {
config: Config,
pool_manager: PoolManager,
stats: Stats,
database: Database,
}
impl AggregatorServer {
pub fn stats(&self) -> &Stats {
&self.stats
}
}
#[derive(Clone)]
struct AppState {
pool_manager: PoolManager,
stats: Stats,
current_base_target: Arc<RwLock<u64>>, current_block_hash: Arc<RwLock<String>>, database: Database,
current_height: Arc<RwLock<u64>>, retention_blocks: u64, server_auth: RpcServerAuth, }
use std::sync::Arc;
use tokio::sync::RwLock;
impl AggregatorServer {
pub async fn new(config: Config) -> Result<Self> {
let pool_manager = PoolManager::new(
&config.upstream,
config.cache.mining_info_ttl_secs,
config.cache.pool_timeout_secs,
)?;
let stats = Stats::new(config.upstream.block_time_secs);
let database = Database::new(&config.database.path)?;
info!("Loading historical submissions from database...");
match database.get_all_recent_submissions(1000) {
Ok(submissions) => {
info!("Loaded {} historical submissions", submissions.len());
for sub in submissions {
stats
.record_submission(
&sub.account_id,
Some(sub.machine_id),
sub.raw_quality as u64,
sub.height as u64,
)
.await;
}
info!("Historical data loaded successfully");
}
Err(e) => {
error!("Failed to load historical submissions: {}", e);
}
}
Ok(Self {
config,
pool_manager,
stats,
database,
})
}
pub async fn run(self) -> Result<()> {
let retention_blocks = self.config.retention_blocks();
let listen_address = self.config.server.listen_address.clone();
let server_auth = self.config.server.auth.clone();
let dashboard_enabled = self
.config
.dashboard
.as_ref()
.map(|d| d.enabled)
.unwrap_or(false);
if server_auth.is_required() {
info!("Server authentication: ENABLED (BasicAuth)");
} else {
info!("Server authentication: DISABLED");
}
let state = AppState {
pool_manager: self.pool_manager,
stats: self.stats,
current_base_target: Arc::new(RwLock::new(1)), current_block_hash: Arc::new(RwLock::new(String::new())),
database: self.database,
current_height: Arc::new(RwLock::new(0)), retention_blocks,
server_auth,
};
let mut app = Router::new()
.route("/", post(handle_jsonrpc))
.route("/health", get(health_check));
if dashboard_enabled {
app = app.route("/stats", get(get_stats));
}
let app = app.with_state(state);
let listener = tokio::net::TcpListener::bind(&listen_address)
.await
.map_err(|e| Error::Server(format!("Failed to bind to {}: {}", listen_address, e)))?;
info!("Aggregator listening on {}", listen_address);
crate::callback::with_callback(|cb| {
cb.on_started(&crate::callback::AggregatorStartedInfo {
listen_address: listen_address.clone(),
upstream_name: self.config.upstream.name.clone(),
});
});
let server = axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.with_graceful_shutdown(shutdown_signal());
server
.await
.map_err(|e| Error::Server(format!("Server error: {}", e)))?;
info!("Server shutdown complete");
crate::callback::with_callback(|cb| cb.on_stopped());
Ok(())
}
}
async fn handle_jsonrpc(
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<AppState>,
headers: HeaderMap,
Json(request): Json<Value>,
) -> Response {
let client_ip = addr.ip().to_string();
debug!("Received JSON-RPC request: {}", request);
if state.server_auth.is_required() {
if let Err(response) = validate_basic_auth(&headers, &state.server_auth, &client_ip) {
return response;
}
}
let method = match request.get("method").and_then(|m| m.as_str()) {
Some(m) => m,
None => {
return json_rpc_error(
JsonRpcError {
code: -32600,
message: "Invalid Request: missing method".to_string(),
data: None,
},
JsonRpcId::Null,
);
}
};
let id = request
.get("id")
.and_then(|id| {
if id.is_string() {
id.as_str().map(|s| JsonRpcId::from_string(s.to_string()))
} else if id.is_number() {
id.as_u64().map(JsonRpcId::from_number)
} else {
Some(JsonRpcId::Null)
}
})
.unwrap_or(JsonRpcId::Null);
match method {
METHOD_GET_MINING_INFO => handle_get_mining_info(state, request, id, client_ip).await,
METHOD_SUBMIT_NONCE => handle_submit_nonce(state, request, id, client_ip).await,
_ => json_rpc_error(
JsonRpcError {
code: -32601,
message: format!("Method not found: {}", method),
data: None,
},
id,
),
}
}
async fn handle_get_mining_info(
state: AppState,
_request: Value,
id: JsonRpcId,
_client_ip: String,
) -> Response {
match state.pool_manager.get_mining_info().await {
Ok(info) => {
state.stats.update_height(info.height).await;
state.stats.update_base_target(info.base_target).await;
*state.current_base_target.write().await = info.base_target;
*state.current_block_hash.write().await = info.block_hash.clone();
let old_height = *state.current_height.read().await;
*state.current_height.write().await = info.height;
if info.height != old_height {
crate::callback::with_callback(|cb| {
cb.on_new_block(&crate::callback::BlockUpdate {
height: info.height,
base_target: info.base_target,
});
});
let snapshot = state.stats.snapshot().await;
crate::callback::with_callback(|cb| cb.on_stats_updated(&snapshot));
}
if info.height != old_height && state.retention_blocks > 0 {
if let Err(e) = state
.database
.cleanup_old_submissions(info.height, state.retention_blocks)
{
error!("Failed to cleanup old submissions: {}", e);
}
}
let response: JsonRpcResponse<MiningInfo> = JsonRpcResponse::success(info, id);
Json(response).into_response()
}
Err(e) => {
error!("Failed to get mining info: {}", e);
crate::callback::with_callback(|cb| {
cb.on_error(&format!("Mining info fetch failed: {}", e))
});
json_rpc_error(
JsonRpcError {
code: -32000,
message: "Failed to get mining info".to_string(),
data: Some(serde_json::json!({ "error": e.to_string() })),
},
id,
)
}
}
}
async fn handle_submit_nonce(
state: AppState,
request: Value,
id: JsonRpcId,
client_ip: String,
) -> Response {
let req: JsonRpcRequest<SubmitNonceParams> = match serde_json::from_value(request) {
Ok(r) => r,
Err(e) => {
return json_rpc_error(
JsonRpcError {
code: -32600,
message: "Invalid Request".to_string(),
data: Some(serde_json::json!({ "error": e.to_string() })),
},
id,
);
}
};
crate::callback::with_callback(|cb| {
cb.on_submission_received(&crate::callback::SubmissionInfo {
height: req.params.height,
account_id: req.params.account_id.clone(),
machine_id: Some(client_ip.clone()),
generation_signature: req.params.generation_signature.clone(),
seed: req.params.seed.clone(),
nonce: req.params.nonce,
compression: req.params.compression,
raw_quality: req.params.raw_quality,
});
});
match state.pool_manager.submit_nonce(req.params.clone()).await {
Ok(result) => {
let machine_id = Some(client_ip);
state
.stats
.record_submission(
&req.params.account_id,
machine_id.clone(),
result.raw_quality,
req.params.height,
)
.await;
if let Err(e) = state.database.save_submission(
&req.params.account_id,
machine_id.clone(),
result.raw_quality,
req.params.height,
) {
error!("Failed to queue submission save: {}", e);
}
crate::callback::with_callback(|cb| {
cb.on_submission_accepted(&crate::callback::AcceptedInfo {
height: req.params.height,
account_id: req.params.account_id.clone(),
machine_id,
generation_signature: req.params.generation_signature.clone(),
seed: req.params.seed.clone(),
nonce: req.params.nonce,
compression: req.params.compression,
raw_quality: result.raw_quality,
poc_time: result.poc_time,
});
});
let snapshot = state.stats.snapshot().await;
crate::callback::with_callback(|cb| cb.on_stats_updated(&snapshot));
let response: JsonRpcResponse<SubmitNonceResult> = JsonRpcResponse::success(result, id);
Json(response).into_response()
}
Err(e) => {
error!("Failed to submit nonce: {}", e);
crate::callback::with_callback(|cb| {
cb.on_submission_rejected(&crate::callback::RejectedInfo {
height: req.params.height,
account_id: req.params.account_id.clone(),
machine_id: Some(client_ip.clone()),
reason: e.to_string(),
});
});
json_rpc_error(
JsonRpcError {
code: -32000,
message: "Failed to submit nonce".to_string(),
data: Some(serde_json::json!({ "error": e.to_string() })),
},
id,
)
}
}
}
async fn health_check() -> impl IntoResponse {
(StatusCode::OK, "OK")
}
async fn get_stats(State(state): State<AppState>) -> impl IntoResponse {
let snapshot = state.stats.snapshot().await;
Json(snapshot)
}
fn json_rpc_error(error: JsonRpcError, id: JsonRpcId) -> Response {
let response: JsonRpcResponse<()> = JsonRpcResponse::error(error, id);
(StatusCode::OK, Json(response)).into_response()
}
#[allow(clippy::result_large_err)]
fn validate_basic_auth(
headers: &HeaderMap,
auth_config: &RpcServerAuth,
client_ip: &str,
) -> std::result::Result<(), Response> {
let auth_header = headers.get(header::AUTHORIZATION);
let auth_value = match auth_header {
Some(value) => value.to_str().unwrap_or(""),
None => {
warn!(
"Auth required but no Authorization header from {}",
client_ip
);
return Err(json_rpc_error(
JsonRpcError {
code: -32004,
message: "Authentication required".to_string(),
data: None,
},
JsonRpcId::Null,
));
}
};
if !auth_value.starts_with("Basic ") {
warn!("Invalid auth scheme from {}", client_ip);
return Err(json_rpc_error(
JsonRpcError {
code: -32005,
message: "Invalid authentication scheme".to_string(),
data: None,
},
JsonRpcId::Null,
));
}
let encoded = &auth_value[6..];
let decoded = match STANDARD.decode(encoded) {
Ok(bytes) => String::from_utf8_lossy(&bytes).to_string(),
Err(_) => {
warn!("Invalid base64 in auth header from {}", client_ip);
return Err(json_rpc_error(
JsonRpcError {
code: -32005,
message: "Invalid authentication credentials".to_string(),
data: None,
},
JsonRpcId::Null,
));
}
};
let parts: Vec<&str> = decoded.splitn(2, ':').collect();
if parts.len() != 2 {
warn!("Malformed credentials from {}", client_ip);
return Err(json_rpc_error(
JsonRpcError {
code: -32005,
message: "Invalid authentication credentials".to_string(),
data: None,
},
JsonRpcId::Null,
));
}
let (username, password) = (parts[0], parts[1]);
if auth_config.validate_credentials(username, password) {
debug!("Auth successful for user '{}' from {}", username, client_ip);
Ok(())
} else {
warn!("Auth failed for user '{}' from {}", username, client_ip);
Err(json_rpc_error(
JsonRpcError {
code: -32005,
message: "Invalid authentication credentials".to_string(),
data: None,
},
JsonRpcId::Null,
))
}
}
async fn shutdown_signal() {
use tokio::signal;
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
let stop_poll = async {
loop {
if crate::control::is_stop_requested() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
};
tokio::select! {
_ = ctrl_c => {
info!("Received Ctrl+C, shutting down gracefully...");
},
_ = terminate => {
info!("Received SIGTERM, shutting down gracefully...");
},
_ = stop_poll => {
info!("Stop requested via API, shutting down gracefully...");
},
}
}