use std::collections::HashMap;
use serde_json::Value;
use tonic::{Request, Response, Status, metadata::MetadataMap};
use crate::{
error::EventError,
server::{AppState, notify_plugins, replicate_events},
store::{self, AppendEvent, EventRecord},
token::AccessKind,
};
pub mod proto {
tonic::include_proto!("eventdbx.api");
}
use proto::event_service_server::{EventService, EventServiceServer};
use proto::{
ActorClaims, Aggregate, AppendEventRequest, AppendEventResponse, Event, EventMetadata,
GetAggregateRequest, GetAggregateResponse, HealthRequest, HealthResponse,
ListAggregatesRequest, ListAggregatesResponse, ListEventsRequest, ListEventsResponse,
VerifyAggregateRequest, VerifyAggregateResponse,
};
#[derive(Clone)]
pub struct GrpcApi {
state: AppState,
}
impl GrpcApi {
pub(crate) fn new(state: AppState) -> Self {
Self { state }
}
pub(crate) fn into_server(self) -> EventServiceServer<Self> {
EventServiceServer::new(self)
}
fn map_error(err: EventError) -> Status {
match err {
EventError::InvalidToken | EventError::Unauthorized => {
Status::unauthenticated(err.to_string())
}
EventError::TokenExpired | EventError::TokenLimitReached => {
Status::permission_denied(err.to_string())
}
EventError::AggregateNotFound | EventError::SchemaNotFound => {
Status::not_found(err.to_string())
}
EventError::SchemaViolation(_)
| EventError::InvalidSchema(_)
| EventError::Config(_) => Status::invalid_argument(err.to_string()),
EventError::SchemaExists | EventError::AggregateArchived => {
Status::failed_precondition(err.to_string())
}
EventError::Storage(_) | EventError::Serialization(_) | EventError::Io(_) => {
Status::internal(err.to_string())
}
}
}
fn extract_token(metadata: &MetadataMap) -> Result<String, Status> {
let value = metadata
.get("authorization")
.ok_or_else(|| Status::unauthenticated("missing authorization metadata"))?;
let value = value
.to_str()
.map_err(|_| Status::unauthenticated("invalid authorization metadata"))?;
let prefix = "Bearer ";
if let Some(rest) = value.strip_prefix(prefix) {
Ok(rest.to_string())
} else {
Err(Status::unauthenticated(
"authorization metadata must use Bearer token",
))
}
}
fn convert_event(record: EventRecord) -> Result<Event, Status> {
let metadata = record.metadata.clone();
let issued_by = metadata.issued_by.map(|claims| ActorClaims {
group: claims.group,
user: claims.user,
});
let event = Event {
aggregate_type: record.aggregate_type,
aggregate_id: record.aggregate_id,
event_type: record.event_type,
version: record.version,
payload_json: serde_json::to_string(&record.payload)
.map_err(|err| Status::internal(err.to_string()))?,
metadata: Some(EventMetadata {
event_id: metadata.event_id.to_string(),
created_at: metadata.created_at.to_rfc3339(),
issued_by,
}),
hash: record.hash,
merkle_root: record.merkle_root,
};
Ok(event)
}
fn sanitize_aggregate(&self, aggregate: crate::store::AggregateState) -> Aggregate {
let filtered = self.state.sanitize_aggregate(aggregate);
Aggregate {
aggregate_type: filtered.aggregate_type,
aggregate_id: filtered.aggregate_id,
version: filtered.version,
state: filtered.state.into_iter().collect::<HashMap<_, _>>(),
merkle_root: filtered.merkle_root,
archived: filtered.archived,
}
}
}
#[tonic::async_trait]
impl EventService for GrpcApi {
async fn health(
&self,
_request: Request<HealthRequest>,
) -> Result<Response<HealthResponse>, Status> {
Ok(Response::new(HealthResponse {
status: "ok".to_string(),
}))
}
async fn append_event(
&self,
request: Request<AppendEventRequest>,
) -> Result<Response<AppendEventResponse>, Status> {
let metadata = request.metadata();
let token = Self::extract_token(metadata)?;
let payload = request.into_inner();
let payload_value: Value = serde_json::from_str(&payload.payload_json)
.map_err(|err| Status::invalid_argument(format!("invalid payload_json: {}", err)))?;
let claims = self
.state
.tokens
.authorize(&token, AccessKind::Write)
.map_err(Self::map_error)?
.into();
let payload_map = store::payload_to_map(&payload_value);
if self.state.restrict {
if let Err(err) = self.state.schemas.validate_event(
&payload.aggregate_type,
&payload.event_type,
&payload_map,
) {
return Err(Self::map_error(err));
}
}
let record = self
.state
.store
.append(AppendEvent {
aggregate_type: payload.aggregate_type,
aggregate_id: payload.aggregate_id,
event_type: payload.event_type,
payload: payload_value,
issued_by: Some(claims),
})
.map_err(Self::map_error)?;
notify_plugins(&self.state, &record);
replicate_events(&self.state, std::slice::from_ref(&record));
let event = Self::convert_event(record)?;
Ok(Response::new(AppendEventResponse { event: Some(event) }))
}
async fn list_aggregates(
&self,
request: Request<ListAggregatesRequest>,
) -> Result<Response<ListAggregatesResponse>, Status> {
let params = request.into_inner();
let skip = params.skip as usize;
let mut take = if params.take == 0 {
self.state.list_page_size
} else {
params.take as usize
};
if take == 0 {
return Ok(Response::new(ListAggregatesResponse {
aggregates: Vec::new(),
}));
}
if take > self.state.page_limit {
take = self.state.page_limit;
}
let mut aggregates = self.state.store.aggregates();
aggregates.retain(|aggregate| !self.state.is_hidden_aggregate(&aggregate.aggregate_type));
let page = aggregates
.into_iter()
.skip(skip)
.take(take)
.map(|aggregate| self.sanitize_aggregate(aggregate))
.collect();
Ok(Response::new(ListAggregatesResponse { aggregates: page }))
}
async fn get_aggregate(
&self,
request: Request<GetAggregateRequest>,
) -> Result<Response<GetAggregateResponse>, Status> {
let params = request.into_inner();
if self.state.is_hidden_aggregate(¶ms.aggregate_type) {
return Err(Status::not_found("aggregate not found"));
}
let aggregate = self
.state
.store
.get_aggregate_state(¶ms.aggregate_type, ¶ms.aggregate_id)
.map_err(Self::map_error)?;
let aggregate = self.sanitize_aggregate(aggregate);
Ok(Response::new(GetAggregateResponse {
aggregate: Some(aggregate),
}))
}
async fn list_events(
&self,
request: Request<ListEventsRequest>,
) -> Result<Response<ListEventsResponse>, Status> {
let params = request.into_inner();
if self.state.is_hidden_aggregate(¶ms.aggregate_type) {
return Err(Status::not_found("aggregate not found"));
}
let skip = params.skip as usize;
let mut take = if params.take == 0 {
self.state.page_limit
} else {
params.take as usize
};
if take == 0 {
return Ok(Response::new(ListEventsResponse { events: Vec::new() }));
}
if take > self.state.page_limit {
take = self.state.page_limit;
}
let events = self
.state
.store
.list_events(¶ms.aggregate_type, ¶ms.aggregate_id)
.map_err(Self::map_error)?;
let page = events
.into_iter()
.skip(skip)
.take(take)
.map(Self::convert_event)
.collect::<Result<Vec<_>, _>>()?;
Ok(Response::new(ListEventsResponse { events: page }))
}
async fn verify_aggregate(
&self,
request: Request<VerifyAggregateRequest>,
) -> Result<Response<VerifyAggregateResponse>, Status> {
let params = request.into_inner();
if self.state.is_hidden_aggregate(¶ms.aggregate_type) {
return Err(Status::not_found("aggregate not found"));
}
let merkle_root = self
.state
.store
.verify(¶ms.aggregate_type, ¶ms.aggregate_id)
.map_err(Self::map_error)?;
Ok(Response::new(VerifyAggregateResponse { merkle_root }))
}
}
impl From<EventError> for Status {
fn from(value: EventError) -> Self {
GrpcApi::map_error(value)
}
}