use std::sync::Arc;
use tonic::{Request, Response, Status};
use orlando_core::GrainActivator;
use crate::message_registry::MessageRegistry;
use crate::network_message::Encoding;
use crate::proto::cluster_gateway_server::ClusterGateway;
use crate::proto::{
ClusterPingRequest, ClusterPingResponse, DrainAck, DrainNotification,
ForwardInvokeRequest, ForwardInvokeResponse,
};
pub struct ClusterGatewayService {
registry: Arc<MessageRegistry>,
activator: Arc<dyn GrainActivator>,
local_cluster_id: String,
}
impl ClusterGatewayService {
pub fn new(
registry: Arc<MessageRegistry>,
activator: Arc<dyn GrainActivator>,
local_cluster_id: String,
) -> Self {
Self {
registry,
activator,
local_cluster_id,
}
}
}
#[tonic::async_trait]
impl ClusterGateway for ClusterGatewayService {
async fn forward_invoke(
&self,
request: Request<ForwardInvokeRequest>,
) -> Result<Response<ForwardInvokeResponse>, Status> {
let req = request.into_inner();
let encoding = Encoding::from_proto(req.encoding);
tracing::debug!(
grain_type = %req.grain_type,
grain_key = %req.grain_key,
source_cluster = %req.source_cluster_id,
"handling cross-cluster forward invoke"
);
match self
.registry
.dispatch(
&req.grain_type,
req.grain_key,
&req.message_type,
req.message_version,
req.payload,
encoding,
req.request_context,
self.activator.clone(),
)
.await
{
Ok((payload, enc)) => Ok(Response::new(ForwardInvokeResponse {
payload,
error: String::new(),
encoding: enc.to_proto(),
})),
Err(e) => Ok(Response::new(ForwardInvokeResponse {
payload: Vec::new(),
error: e.to_string(),
encoding: encoding.to_proto(),
})),
}
}
async fn cluster_ping(
&self,
_request: Request<ClusterPingRequest>,
) -> Result<Response<ClusterPingResponse>, Status> {
Ok(Response::new(ClusterPingResponse {
cluster_id: self.local_cluster_id.clone(),
silo_count: 1,
active_grains: 0,
state: crate::proto::ClusterState::Healthy.into(),
}))
}
async fn notify_drain(
&self,
request: Request<DrainNotification>,
) -> Result<Response<DrainAck>, Status> {
let drain = request.into_inner();
tracing::info!(
draining_cluster = %drain.cluster_id,
grain_count = drain.grains.len(),
"received drain notification from peer cluster"
);
Ok(Response::new(DrainAck {}))
}
}