use remote_hash_map::common::utils::get_endpoint;
use remote_hash_map::rhm::rhm::{Rhm, RhmResult};
use remote_hash_map::rpc::node_group_rpc::node_group_rpc_client::NodeGroupRpcClient;
use remote_hash_map::rpc::node_group_rpc::node_group_rpc_client::NodeGroupRpcClient as NGClient;
use remote_hash_map::rpc::node_group_rpc::{AddServerRequest, GetServerRequest, ReplicateRequest};
use remote_hash_map::rpc::node_rpc::node_rpc_server::{NodeRpc, NodeRpcServer};
use remote_hash_map::rpc::node_rpc::{GetRequest, GetResponse, PingRequest, PingResponse, SetRequest, SetResponse};
use log::{debug, error, info, warn};
use std::net::SocketAddr;
use std::sync::Arc;
use structopt::StructOpt;
use thiserror::Error;
use tokio::sync::Mutex;
use tonic::transport::{Channel, Endpoint, Server};
use tonic::{Request, Response, Status};
use tonic_reflection::server::Builder as ReflectionBuilder;
pub const FILE_DESCRIPTOR_SET: &[u8] = include_bytes!("../../proto/node_rpc_descriptor.bin");
#[derive(Error, Debug)]
pub enum NodeError {
#[error("RHM operation failed: {0}")]
RhmError(String),
#[error("Node group communication failed: {0}")]
NodeGroupError(String),
#[error("Tonic transport error: {0}")]
TonicError(#[from] tonic::transport::Error),
#[error("Internal error: {0}")]
InternalError(String),
}
#[derive(StructOpt, Debug)]
#[structopt(name = "Node")]
struct Opt {
#[structopt(long, parse(try_from_str), default_value = "127.0.0.1:6000")]
listen: SocketAddr,
#[structopt(long, parse(try_from_str))]
ng: Option<SocketAddr>,
}
#[derive(Debug)]
pub struct ImplNodeRpc {
rhm: Arc<Mutex<Rhm>>,
addr: SocketAddr,
pub ng: Option<Endpoint>,
}
#[tonic::async_trait]
impl NodeRpc for ImplNodeRpc {
async fn set(&self, request: Request<SetRequest>) -> Result<Response<SetResponse>, Status> {
let req = request.into_inner();
let mut rhm = self.rhm.lock().await;
info!("Received set request: key = {}, value = {}", req.key, req.value);
let result = rhm.set(&req.key, &req.value).await.map_err(|e| {
error!("Failed to set value: {}", e);
Status::internal(format!("Failed to set value: {}", e))
})?;
info!("Set request successful: key = {}", req.key);
if req.replication || self.ng.is_none() {
debug!("Skipping replication for key: {}", req.key);
return Ok(Response::new(SetResponse { result: result.value() }));
}
info!("Sending request for replication: key = {}, value = {}", &req.key, &req.value);
match self.ng().await {
Ok(mut client) => {
debug!("Sending replication request to NodeGroup");
client
.replicate(ReplicateRequest {
key: req.key.clone(),
value: req.value.clone(),
source: self.addr.to_string(),
})
.await
.map_err(|e| Status::internal(format!("Replication failed: {}", e)))?;
info!("Replication request sent successfully for key: {}", req.key);
}
Err(e) => {
warn!("Can't replicate: NodeGroup not available: {}", e);
}
}
Ok(Response::new(SetResponse { result: result.value() }))
}
async fn get(&self, request: Request<GetRequest>) -> Result<Response<GetResponse>, Status> {
let req = request.into_inner();
let rhm = self.rhm.lock().await;
info!("Received get request: key = {}", req.key);
let result = rhm.get(&req.key).await;
let found = matches!(result, RhmResult::Value(_));
info!("Get request result: key = {}, found = {}", req.key, found);
Ok(Response::new(GetResponse { value: result.value(), found }))
}
async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
info!("Received ping request from: {:?}", request.into_inner());
Ok(Response::new(PingResponse { result: "Pong".to_string() }))
}
}
impl ImplNodeRpc {
async fn ng(&self) -> Result<NodeGroupRpcClient<Channel>, NodeError> {
let ng = self.ng.clone();
let ng = ng.ok_or(NodeError::NodeGroupError("No NodeGroup found".to_string()))?;
debug!("Connecting to NodeGroup at {:?}", ng);
NGClient::connect(ng).await.map_err(NodeError::TonicError)
}
pub async fn attach_to_group(&self) -> Result<(), NodeError> {
info!("Attaching to NodeGroup");
let mut client = self.ng().await?;
client
.add_server(AddServerRequest { addr: self.addr.to_string() })
.await
.map_err(|e| NodeError::NodeGroupError(e.to_string()))?;
let response = client.get_server(GetServerRequest {}).await.map_err(|e| NodeError::NodeGroupError(e.to_string()))?;
info!("Attached to group: {:?}", response);
Ok(())
}
pub fn new(rhm: Rhm, addr: SocketAddr) -> Self {
debug!("Creating new ImplNodeRpc instance");
Self {
rhm: Arc::new(Mutex::new(rhm)),
addr,
ng: None,
}
}
}
#[tokio::main]
#[allow(dead_code)]
async fn main() -> Result<(), NodeError> {
env_logger::Builder::from_default_env().filter_level(log::LevelFilter::Info).init();
let opt = Opt::from_args();
let addr = opt.listen;
info!("Initializing node with address: {}", addr);
let rhm = Rhm::new(&addr.to_string()).await.map_err(|e| NodeError::RhmError(e.to_string()))?;
let mut node_rpc = ImplNodeRpc::new(rhm, addr);
if let Some(ng_addr) = opt.ng {
info!("NodeGroup address provided: {}", ng_addr);
let endpoint = get_endpoint(&ng_addr.to_string()).map_err(|e| NodeError::InternalError(e.to_string()))?;
node_rpc.ng = Some(endpoint.clone());
node_rpc.attach_to_group().await?;
} else {
warn!("No NodeGroup address provided. Running in standalone mode.");
}
info!("Node listening on {}", addr);
let reflection_service = match ReflectionBuilder::configure().register_encoded_file_descriptor_set(FILE_DESCRIPTOR_SET).build() {
Ok(service) => service,
Err(e) => {
eprintln!("Failed to build reflection service: {}", e);
panic!()
}
};
Server::builder()
.add_service(NodeRpcServer::new(node_rpc))
.add_service(reflection_service)
.serve(addr)
.await
.map_err(NodeError::TonicError)?;
Ok(())
}