eventdbx 1.5.5

An event-sourced, key-value, write-side database system.
Documentation
use parking_lot::Mutex;
use tokio::runtime::Builder;
use tokio_stream::iter;
use tonic::{Request, transport::Channel};

use crate::{
    config::GrpcPluginConfig,
    error::{EventError, Result},
    replication::{
        convert_event, normalize_endpoint,
        proto::{EventBatch, replication_client::ReplicationClient},
    },
    schema::AggregateSchema,
    store::{AggregateState, EventRecord},
};

use super::Plugin;

pub struct GrpcPlugin {
    config: GrpcPluginConfig,
    runtime: tokio::runtime::Runtime,
    client: Mutex<Option<ReplicationClient<Channel>>>,
}

impl GrpcPlugin {
    pub fn new(config: GrpcPluginConfig) -> Self {
        let runtime = Builder::new_current_thread()
            .enable_all()
            .build()
            .expect("failed to build gRPC runtime");
        Self {
            config,
            runtime,
            client: Mutex::new(None),
        }
    }

    pub fn ensure_ready(&self) -> Result<()> {
        let endpoint = self.config.endpoint.clone();
        self.runtime.block_on(async {
            let _ = connect(&endpoint).await?;
            Ok(())
        })
    }
}

impl Plugin for GrpcPlugin {
    fn name(&self) -> &'static str {
        "grpc"
    }

    fn notify_event(
        &self,
        record: &EventRecord,
        _state: &AggregateState,
        _schema: Option<&AggregateSchema>,
    ) -> Result<()> {
        let batch = EventBatch {
            sequence: record.version,
            events: vec![convert_event(record)?],
        };
        let endpoint = self.config.endpoint.clone();

        self.runtime.block_on(async {
            let mut guard = self.client.lock();
            if guard.is_none() {
                let client = connect(&endpoint).await?;
                *guard = Some(client);
            }

            if let Some(client) = guard.as_mut() {
                if let Err(err) = client
                    .apply_events(Request::new(iter(vec![batch.clone()])))
                    .await
                {
                    *guard = None;
                    return Err(EventError::Storage(err.to_string()));
                }
                Ok(())
            } else {
                Err(EventError::Storage(
                    "grpc client unavailable after initialization".into(),
                ))
            }
        })
    }
}

async fn connect(endpoint: &str) -> Result<ReplicationClient<Channel>> {
    let endpoint = normalize_endpoint(endpoint)?;
    let channel = tonic::transport::Endpoint::try_from(endpoint.clone())
        .map_err(|err| EventError::Config(err.to_string()))?
        .connect()
        .await
        .map_err(|err| EventError::Storage(err.to_string()))?;
    Ok(ReplicationClient::new(channel))
}