use std::sync::Arc;
use tonic::{Request, Response, Status};
use crate::proto;
use crate::proto::admin_service_server::AdminService;
use super::backend::{GqlBackend, IndexDefinition};
pub struct AdminServiceImpl<B: GqlBackend> {
backend: Arc<B>,
}
impl<B: GqlBackend> AdminServiceImpl<B> {
pub fn new(backend: Arc<B>) -> Self {
Self { backend }
}
}
#[tonic::async_trait]
impl<B: GqlBackend> AdminService for AdminServiceImpl<B> {
#[tracing::instrument(skip(self, request), fields(graph))]
async fn get_graph_stats(
&self,
request: Request<proto::GetGraphStatsRequest>,
) -> Result<Response<proto::GetGraphStatsResponse>, Status> {
let req = request.into_inner();
tracing::Span::current().record("graph", &req.graph);
if req.graph.is_empty() {
return Err(Status::invalid_argument("graph name is required"));
}
let stats = self
.backend
.get_graph_stats(&req.graph)
.await
.map_err(|e| e.to_optional_service_status())?;
Ok(Response::new(proto::GetGraphStatsResponse {
node_count: stats.node_count,
edge_count: stats.edge_count,
label_count: stats.label_count,
edge_type_count: stats.edge_type_count,
property_key_count: stats.property_key_count,
index_count: stats.index_count,
memory_bytes: stats.memory_bytes,
disk_bytes: stats.disk_bytes,
}))
}
#[tracing::instrument(skip(self, request), fields(graph))]
async fn wal_status(
&self,
request: Request<proto::WalStatusRequest>,
) -> Result<Response<proto::WalStatusResponse>, Status> {
let req = request.into_inner();
tracing::Span::current().record("graph", &req.graph);
if req.graph.is_empty() {
return Err(Status::invalid_argument("graph name is required"));
}
let status = self
.backend
.wal_status(&req.graph)
.await
.map_err(|e| e.to_optional_service_status())?;
Ok(Response::new(proto::WalStatusResponse {
enabled: status.enabled,
path: status.path,
size_bytes: status.size_bytes,
record_count: status.record_count,
last_checkpoint: status.last_checkpoint,
current_epoch: status.current_epoch,
}))
}
#[tracing::instrument(skip(self, request), fields(graph))]
async fn wal_checkpoint(
&self,
request: Request<proto::WalCheckpointRequest>,
) -> Result<Response<proto::WalCheckpointResponse>, Status> {
let req = request.into_inner();
tracing::Span::current().record("graph", &req.graph);
if req.graph.is_empty() {
return Err(Status::invalid_argument("graph name is required"));
}
self.backend
.wal_checkpoint(&req.graph)
.await
.map_err(|e| e.to_optional_service_status())?;
tracing::info!(graph = %req.graph, "WAL checkpoint completed");
Ok(Response::new(proto::WalCheckpointResponse {}))
}
#[tracing::instrument(skip(self, request), fields(graph))]
async fn validate(
&self,
request: Request<proto::ValidateRequest>,
) -> Result<Response<proto::ValidateResponse>, Status> {
let req = request.into_inner();
tracing::Span::current().record("graph", &req.graph);
if req.graph.is_empty() {
return Err(Status::invalid_argument("graph name is required"));
}
let result = self
.backend
.validate(&req.graph)
.await
.map_err(|e| e.to_optional_service_status())?;
Ok(Response::new(proto::ValidateResponse {
valid: result.valid,
errors: result
.errors
.into_iter()
.map(|e| proto::ValidationError {
code: e.code,
message: e.message,
context: e.context,
})
.collect(),
warnings: result
.warnings
.into_iter()
.map(|w| proto::ValidationWarning {
code: w.code,
message: w.message,
context: w.context,
})
.collect(),
}))
}
#[tracing::instrument(skip(self, request), fields(graph))]
async fn create_index(
&self,
request: Request<proto::CreateIndexRequest>,
) -> Result<Response<proto::CreateIndexResponse>, Status> {
let req = request.into_inner();
tracing::Span::current().record("graph", &req.graph);
if req.graph.is_empty() {
return Err(Status::invalid_argument("graph name is required"));
}
let index_def = match req.index {
Some(proto::create_index_request::Index::PropertyIndex(def)) => {
IndexDefinition::Property {
property: def.property,
}
}
Some(proto::create_index_request::Index::VectorIndex(def)) => IndexDefinition::Vector {
label: def.label,
property: def.property,
dimensions: def.dimensions,
metric: def.metric,
m: def.m,
ef_construction: def.ef_construction,
},
Some(proto::create_index_request::Index::TextIndex(def)) => IndexDefinition::Text {
label: def.label,
property: def.property,
},
None => {
return Err(Status::invalid_argument("index definition is required"));
}
};
self.backend
.create_index(&req.graph, index_def)
.await
.map_err(|e| e.to_optional_service_status())?;
tracing::info!(graph = %req.graph, "index created");
Ok(Response::new(proto::CreateIndexResponse {}))
}
#[tracing::instrument(skip(self, request), fields(graph))]
async fn drop_index(
&self,
request: Request<proto::DropIndexRequest>,
) -> Result<Response<proto::DropIndexResponse>, Status> {
let req = request.into_inner();
tracing::Span::current().record("graph", &req.graph);
if req.graph.is_empty() {
return Err(Status::invalid_argument("graph name is required"));
}
let index_def = match req.index {
Some(proto::drop_index_request::Index::PropertyIndex(def)) => {
IndexDefinition::Property {
property: def.property,
}
}
Some(proto::drop_index_request::Index::VectorIndex(def)) => IndexDefinition::Vector {
label: def.label,
property: def.property,
dimensions: None,
metric: None,
m: None,
ef_construction: None,
},
Some(proto::drop_index_request::Index::TextIndex(def)) => IndexDefinition::Text {
label: def.label,
property: def.property,
},
None => {
return Err(Status::invalid_argument("index definition is required"));
}
};
let existed = self
.backend
.drop_index(&req.graph, index_def)
.await
.map_err(|e| e.to_optional_service_status())?;
Ok(Response::new(proto::DropIndexResponse { existed }))
}
}