reddb-io-server 1.2.0

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
async fn graph_projections(
    &self,
    request: Request<Empty>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_read(request.metadata())?;
    let projections = self.catalog_use_cases().graph_projections().map_err(to_status)?;
    Ok(Response::new(json_payload_reply(
        crate::presentation::admin_json::graph_projections_json(&projections),
    )))
}

async fn declared_graph_projections(
    &self,
    request: Request<Empty>,
) -> Result<Response<PayloadReply>, Status> {
    self.graph_projections(request).await
}

async fn operational_graph_projections(
    &self,
    request: Request<Empty>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_read(request.metadata())?;
    Ok(Response::new(json_payload_reply(
        crate::presentation::admin_json::graph_projections_json(
            &self.catalog_use_cases().operational_graph_projections(),
        ),
    )))
}

async fn graph_projection_statuses(
    &self,
    request: Request<Empty>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_read(request.metadata())?;
    Ok(Response::new(json_payload_reply(
        crate::presentation::catalog_json::catalog_graph_projection_statuses_json(
            &self.catalog_use_cases().graph_projection_statuses(),
        ),
    )))
}

async fn graph_projection_attention(
    &self,
    request: Request<Empty>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_read(request.metadata())?;
    Ok(Response::new(json_payload_reply(
        crate::presentation::catalog_json::catalog_graph_projection_attention_json(
            &self.catalog_use_cases().graph_projection_attention(),
        ),
    )))
}

async fn save_graph_projection(
    &self,
    request: Request<GraphProjectionUpsertRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let request = request.into_inner();
    let input = crate::application::admin_payload::finalize_graph_projection_upsert_input(
        request.name,
        crate::application::admin_payload::graph_projection_from_parts(
            request.node_labels,
            request.node_types,
            request.edge_labels,
        ),
        Some(request.source),
        "graph projection requires at least one of node_labels, node_types or edge_labels",
    )
    .map_err(to_status)?;

    let saved = self
        .admin_use_cases()
        .save_graph_projection(input.name, input.projection, input.source)
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(
        crate::presentation::admin_json::graph_projection_json(&saved),
    )))
}

async fn materialize_graph_projection(
    &self,
    request: Request<IndexNameRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let request = request.into_inner();
    if request.name.trim().is_empty() {
        return Err(Status::invalid_argument(
            "graph projection name cannot be empty",
        ));
    }
    self.admin_use_cases()
        .mark_graph_projection_materializing(request.name.trim())
        .map_err(to_status)?;
    let projection = match self
        .admin_use_cases()
        .materialize_graph_projection(request.name.trim())
    {
        Ok(projection) => projection,
        Err(err) => {
            let _ = self.admin_use_cases().fail_graph_projection(request.name.trim());
            return Err(to_status(err));
        }
    };
    Ok(Response::new(json_payload_reply(
        crate::presentation::admin_json::graph_projection_json(&projection),
    )))
}

async fn mark_graph_projection_materializing(
    &self,
    request: Request<IndexNameRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let request = request.into_inner();
    if request.name.trim().is_empty() {
        return Err(Status::invalid_argument(
            "graph projection name cannot be empty",
        ));
    }
    let projection = self
        .admin_use_cases()
        .mark_graph_projection_materializing(request.name.trim())
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(
        crate::presentation::admin_json::graph_projection_json(&projection),
    )))
}

async fn fail_graph_projection(
    &self,
    request: Request<IndexNameRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let request = request.into_inner();
    if request.name.trim().is_empty() {
        return Err(Status::invalid_argument(
            "graph projection name cannot be empty",
        ));
    }
    let projection = self
        .admin_use_cases()
        .fail_graph_projection(request.name.trim())
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(
        crate::presentation::admin_json::graph_projection_json(&projection),
    )))
}

async fn mark_graph_projection_stale(
    &self,
    request: Request<IndexNameRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let request = request.into_inner();
    if request.name.trim().is_empty() {
        return Err(Status::invalid_argument(
            "graph projection name cannot be empty",
        ));
    }
    let projection = self
        .admin_use_cases()
        .mark_graph_projection_stale(request.name.trim())
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(
        crate::presentation::admin_json::graph_projection_json(&projection),
    )))
}

async fn save_analytics_job(
    &self,
    request: Request<JsonPayloadRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let payload = parse_json_payload(&request.into_inner().payload_json)?;
    let input = crate::application::admin_payload::parse_analytics_job_mutation_input(&payload)
        .map_err(to_status)?;
    let job = self
        .admin_use_cases()
        .save_analytics_job(input.kind, input.projection, input.metadata)
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(analytics_job_json(&job))))
}

async fn queue_analytics_job(
    &self,
    request: Request<JsonPayloadRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let payload = parse_json_payload(&request.into_inner().payload_json)?;
    let input = crate::application::admin_payload::parse_analytics_job_mutation_input(&payload)
        .map_err(to_status)?;
    let job = self
        .admin_use_cases()
        .queue_analytics_job(input.kind, input.projection, input.metadata)
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(analytics_job_json(&job))))
}

async fn start_analytics_job(
    &self,
    request: Request<JsonPayloadRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let payload = parse_json_payload(&request.into_inner().payload_json)?;
    let input = crate::application::admin_payload::parse_analytics_job_mutation_input(&payload)
        .map_err(to_status)?;
    let job = self
        .admin_use_cases()
        .start_analytics_job(input.kind, input.projection, input.metadata)
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(analytics_job_json(&job))))
}

async fn complete_analytics_job(
    &self,
    request: Request<JsonPayloadRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let payload = parse_json_payload(&request.into_inner().payload_json)?;
    let input = crate::application::admin_payload::parse_analytics_job_mutation_input(&payload)
        .map_err(to_status)?;
    let job = self
        .admin_use_cases()
        .complete_analytics_job(input.kind, input.projection, input.metadata)
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(analytics_job_json(&job))))
}

async fn fail_analytics_job(
    &self,
    request: Request<JsonPayloadRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let payload = parse_json_payload(&request.into_inner().payload_json)?;
    let input = crate::application::admin_payload::parse_analytics_job_mutation_input(&payload)
        .map_err(to_status)?;
    let job = self
        .admin_use_cases()
        .fail_analytics_job(input.kind, input.projection, input.metadata)
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(analytics_job_json(&job))))
}

async fn mark_analytics_job_stale(
    &self,
    request: Request<JsonPayloadRequest>,
) -> Result<Response<PayloadReply>, Status> {
    self.authorize_write(request.metadata())?;
    let payload = parse_json_payload(&request.into_inner().payload_json)?;
    let input = crate::application::admin_payload::parse_analytics_job_mutation_input(&payload)
        .map_err(to_status)?;
    let job = self
        .admin_use_cases()
        .mark_analytics_job_stale(input.kind, input.projection, input.metadata)
        .map_err(to_status)?;
    Ok(Response::new(json_payload_reply(analytics_job_json(&job))))
}