eventdbx 3.2.1

An event-sourced, key-value, write-side database system.
Documentation
use std::{
    io::{BufRead, BufReader, BufWriter, Write},
    path::Path,
    process::{Child, ChildStdin, Command, Stdio},
    sync::Arc,
};

use capnp::message::Builder;
use capnp::serialize;
use parking_lot::Mutex;
use serde_json;
use tracing::{debug, info, warn};

use crate::config::ProcessPluginConfig;
use crate::error::{EventError, Result};
use crate::plugin::registry::{InstalledPluginRecord, load_registry, registry_path};
use crate::plugin_capnp;
use crate::schema::AggregateSchema;
use crate::store::{AggregateState, EventRecord};

use super::{Plugin, PluginDelivery};

const CHANNEL_LABEL_STDOUT: &str = "stdout";
const CHANNEL_LABEL_STDERR: &str = "stderr";

pub struct ProcessPlugin {
    connection: Arc<ProcessConnection>,
}

impl ProcessPlugin {
    pub fn new(identifier: String, config: ProcessPluginConfig, data_dir: &Path) -> Result<Self> {
        let registry_path = registry_path(data_dir);
        let registry = load_registry(&registry_path)?;
        let target = current_target_triple();
        let record = registry
            .iter()
            .find(|entry| {
                entry.name == config.name
                    && entry.version == config.version
                    && entry.target == target
            })
            .cloned()
            .ok_or_else(|| {
                EventError::Config(format!(
                    "plugin {}@{} is not installed for target {}",
                    config.name, config.version, target
                ))
            })?;

        let connection = ProcessConnection::new(identifier, config, record)?;

        Ok(Self {
            connection: Arc::new(connection),
        })
    }
}

struct ProcessConnection {
    identifier: String,
    config: ProcessPluginConfig,
    record: InstalledPluginRecord,
    inner: Mutex<Option<ProcessInner>>,
}

struct ProcessInner {
    child: Child,
    writer: BufWriter<ChildStdin>,
    sequence: u64,
}

impl ProcessConnection {
    fn new(
        identifier: String,
        config: ProcessPluginConfig,
        record: InstalledPluginRecord,
    ) -> Result<Self> {
        let connection = Self {
            identifier,
            config,
            record,
            inner: Mutex::new(None),
        };
        // lazy spawn on first use
        Ok(connection)
    }

    fn send_event(
        &self,
        record: &EventRecord,
        state: &AggregateState,
        schema: Option<&AggregateSchema>,
    ) -> Result<()> {
        let mut guard = self.inner.lock();
        self.ensure_running(&mut guard)?;
        if guard.is_none() {
            self.restart(&mut guard)?;
        }

        let inner = guard
            .as_mut()
            .ok_or_else(|| EventError::Storage("plugin process failed to start".into()))?;

        if let Err(err) = self.write_event(inner, record, state, schema) {
            warn!(
                target: "eventdbx.plugin",
                "failed to deliver event to plugin {} ({}), attempting restart: {}",
                self.identifier,
                self.config.name,
                err
            );
            self.restart(&mut guard)?;
            let inner = guard
                .as_mut()
                .ok_or_else(|| EventError::Storage("plugin process failed to restart".into()))?;
            self.write_event(inner, record, state, schema)?;
        }
        Ok(())
    }

    fn ensure_running(&self, guard: &mut Option<ProcessInner>) -> Result<()> {
        if let Some(inner) = guard.as_mut() {
            if let Ok(Some(status)) = inner.child.try_wait() {
                warn!(
                    target: "eventdbx.plugin",
                    "plugin {} exited with status {}",
                    self.identifier,
                    status
                );
                *guard = None;
            }
        }
        Ok(())
    }

    fn restart(&self, guard: &mut Option<ProcessInner>) -> Result<()> {
        let mut inner = self.spawn_process()?;
        self.write_init(&mut inner)?;
        *guard = Some(inner);
        Ok(())
    }

    fn spawn_process(&self) -> Result<ProcessInner> {
        let mut command = Command::new(&self.record.binary_path);
        if !self.config.args.is_empty() {
            command.args(&self.config.args);
        }
        for (key, value) in &self.config.env {
            command.env(key, value);
        }
        if let Some(dir) = &self.config.working_dir {
            command.current_dir(dir);
        } else {
            command.current_dir(&self.record.install_dir);
        }
        command
            .stdin(Stdio::piped())
            .stdout(Stdio::piped())
            .stderr(Stdio::piped());

        let mut child = command.spawn().map_err(|err| {
            EventError::Storage(format!(
                "failed to launch plugin {}: {}",
                self.identifier, err
            ))
        })?;

        let stdin = child.stdin.take().ok_or_else(|| {
            EventError::Storage(format!("plugin {} did not provide stdin", self.identifier))
        })?;

        if let Some(stdout) = child.stdout.take() {
            spawn_stream_logger(self.identifier.clone(), CHANNEL_LABEL_STDOUT, stdout);
        }

        if let Some(stderr) = child.stderr.take() {
            spawn_stream_logger(self.identifier.clone(), CHANNEL_LABEL_STDERR, stderr);
        }

        let writer = BufWriter::new(stdin);

        Ok(ProcessInner {
            child,
            writer,
            sequence: 0,
        })
    }

    fn write_init(&self, inner: &mut ProcessInner) -> Result<()> {
        let mut message = Builder::new_default();
        {
            let mut envelope = message.init_root::<plugin_capnp::plugin_envelope::Builder>();
            let mut message_builder = envelope.reborrow().init_message();
            let mut init = message_builder.reborrow().init_init();
            init.set_plugin_name(&self.config.name);
            init.set_version(&self.config.version);
            init.set_target(&self.record.target);
        }
        serialize::write_message(&mut inner.writer, &message)
            .map_err(|err| EventError::Serialization(err.to_string()))?;
        inner.writer.flush()?;
        Ok(())
    }

    fn write_event(
        &self,
        inner: &mut ProcessInner,
        record: &EventRecord,
        state: &AggregateState,
        schema: Option<&AggregateSchema>,
    ) -> Result<()> {
        let event_id = record.metadata.event_id.to_string();
        let payload_json = serde_json::to_string(&record.payload)?;
        let metadata_json = serde_json::to_string(&record.metadata)?;
        let schema_json = match schema {
            Some(schema) => Some(serde_json::to_string(schema)?),
            None => None,
        };
        let extensions_json = match &record.extensions {
            Some(value) => Some(serde_json::to_string(value)?),
            None => None,
        };

        let mut message = Builder::new_default();
        {
            let mut envelope = message.init_root::<plugin_capnp::plugin_envelope::Builder>();
            let mut union_builder = envelope.reborrow().init_message();
            let mut event = union_builder.reborrow().init_event();
            event.set_sequence(inner.sequence);
            event.set_aggregate_type(&record.aggregate_type);
            event.set_aggregate_id(&record.aggregate_id);
            event.set_event_type(&record.event_type);
            event.set_event_version(record.version);
            event.set_event_id(&event_id);
            event.set_created_at_epoch_micros(record.metadata.created_at.timestamp_micros());
            event.set_payload_json(&payload_json);
            event.set_metadata_json(&metadata_json);
            match extensions_json {
                Some(ref json) => event.set_extensions_json(json),
                None => event.set_extensions_json("null"),
            }
            event.set_hash(&record.hash);
            event.set_merkle_root(&record.merkle_root);

            event.set_state_version(state.version);
            event.set_state_archived(state.archived);
            event.set_state_merkle_root(&state.merkle_root);

            match schema_json {
                Some(ref json) => event.set_schema_json(json),
                None => event.set_schema_json("null"),
            }

            let mut entries = event.init_state_entries(state.state.len() as u32);
            for (idx, (key, value)) in state.state.iter().enumerate() {
                let mut entry = entries.reborrow().get(idx as u32);
                entry.set_key(key);
                entry.set_value(value);
            }
        }

        serialize::write_message(&mut inner.writer, &message)
            .map_err(|err| EventError::Serialization(err.to_string()))?;
        inner.writer.flush()?;
        inner.sequence = inner.sequence.wrapping_add(1);
        Ok(())
    }
}

impl Plugin for ProcessPlugin {
    fn name(&self) -> &'static str {
        "process"
    }

    fn notify_event(&self, delivery: PluginDelivery<'_>) -> Result<()> {
        let Some(record) = delivery.record else {
            return Ok(());
        };
        let Some(state) = delivery.state else {
            return Ok(());
        };
        self.connection.send_event(record, state, delivery.schema)
    }
}

fn spawn_stream_logger<R>(identifier: String, channel: &'static str, reader: R)
where
    R: std::io::Read + Send + 'static,
{
    std::thread::spawn(move || {
        let mut buf_reader = BufReader::new(reader);
        let mut line = String::new();
        loop {
            line.clear();
            match buf_reader.read_line(&mut line) {
                Ok(0) => break,
                Ok(_) => {
                    let trimmed = line.trim_end_matches(&['\r', '\n'][..]);
                    if !trimmed.is_empty() {
                        info!(
                            target: "eventdbx.plugin",
                            "{} {}: {}",
                            identifier,
                            channel,
                            trimmed
                        );
                    }
                }
                Err(err) => {
                    debug!(
                        target: "eventdbx.plugin",
                        "error reading {} from plugin {}: {}",
                        channel,
                        identifier,
                        err
                    );
                    break;
                }
            }
        }
    });
}

fn current_target_triple() -> String {
    format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH)
}