eventdbx 3.9.12

An event-sourced, nosql, write-side database system.
Documentation
use std::{path::Path, sync::Arc};

use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use rocksdb::{DBWithThreadMode, Direction, IteratorMode, MultiThreaded, Options};
use serde::{Deserialize, Serialize};

use crate::{
    conflict::{ConflictKind, ReplicationConflict},
    error::{EventError, Result},
    snowflake::{SnowflakeGenerator, SnowflakeId},
    store::EventRecord,
};

const SEP: u8 = 0x1F;
const PREFIX_CONFLICT: &[u8] = b"conflict";

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConflictStatus {
    Pending,
    Resolved,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum ConflictResolutionMode {
    AppliedIncoming,
    AppendedEvent,
    PatchedEvent,
    Dismissed,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConflictResolution {
    pub mode: ConflictResolutionMode,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub event_id: Option<SnowflakeId>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub event_type: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub comment: Option<String>,
    pub resolved_at: DateTime<Utc>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConflictEntry {
    pub id: u64,
    pub aggregate_type: String,
    pub aggregate_id: String,
    pub kind: ConflictKind,
    pub incoming: EventRecord,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub existing: Option<EventRecord>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub remote: Option<String>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub note: Option<String>,
    pub status: ConflictStatus,
    pub created_at: DateTime<Utc>,
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub resolved: Option<ConflictResolution>,
}

impl ConflictEntry {
    fn new(id: u64, conflict: ReplicationConflict, remote: Option<String>) -> Self {
        Self {
            id,
            aggregate_type: conflict.aggregate_type,
            aggregate_id: conflict.aggregate_id,
            kind: conflict.kind,
            incoming: conflict.incoming,
            existing: conflict.existing,
            remote,
            note: None,
            status: ConflictStatus::Pending,
            created_at: conflict.observed_at.unwrap_or_else(|| Utc::now()),
            resolved: None,
        }
    }
}

#[derive(Clone)]
pub struct ConflictStore {
    db: Arc<DBWithThreadMode<MultiThreaded>>,
    id_generator: Arc<Mutex<SnowflakeGenerator>>,
}

impl ConflictStore {
    pub fn open(path: &Path) -> Result<Self> {
        let mut options = Options::default();
        options.create_if_missing(true);
        let db = DBWithThreadMode::<MultiThreaded>::open(&options, path)
            .map_err(|err| EventError::Storage(err.to_string()))?;
        Ok(Self {
            db: Arc::new(db),
            id_generator: Arc::new(Mutex::new(SnowflakeGenerator::new(0))),
        })
    }

    pub fn record(
        &self,
        conflict: ReplicationConflict,
        remote: Option<String>,
    ) -> Result<ConflictEntry> {
        let id = self.next_id().as_u64();
        let entry = ConflictEntry::new(id, conflict, remote);
        let encoded = serde_json::to_vec(&entry)?;
        self.db
            .put(conflict_key(id), encoded)
            .map_err(|err| EventError::Storage(err.to_string()))?;
        Ok(entry)
    }

    pub fn update_note(&self, id: u64, note: Option<String>) -> Result<ConflictEntry> {
        self.update_entry(id, move |entry| {
            entry.note = note.clone();
        })
    }

    pub fn resolve(&self, id: u64, resolution: ConflictResolution) -> Result<ConflictEntry> {
        self.update_entry(id, move |entry| {
            entry.status = ConflictStatus::Resolved;
            entry.resolved = Some(resolution.clone());
        })
    }

    pub fn get(&self, id: u64) -> Result<Option<ConflictEntry>> {
        match self
            .db
            .get(conflict_key(id))
            .map_err(|err| EventError::Storage(err.to_string()))?
        {
            Some(bytes) => {
                let entry = serde_json::from_slice(&bytes)?;
                Ok(Some(entry))
            }
            None => Ok(None),
        }
    }

    pub fn list(
        &self,
        status: Option<ConflictStatus>,
        limit: Option<usize>,
    ) -> Result<Vec<ConflictEntry>> {
        let iter = self.db.iterator(IteratorMode::From(
            conflict_prefix().as_slice(),
            Direction::Forward,
        ));
        let mut entries = Vec::new();
        for item in iter {
            let (key, value) = item.map_err(|err| EventError::Storage(err.to_string()))?;
            if !key.starts_with(conflict_prefix().as_slice()) {
                break;
            }
            let entry: ConflictEntry = serde_json::from_slice(&value)?;
            if status.map_or(true, |desired| entry.status == desired) {
                entries.push(entry);
                if let Some(cap) = limit {
                    if entries.len() >= cap {
                        break;
                    }
                }
            }
        }
        Ok(entries)
    }

    fn update_entry<F>(&self, id: u64, mut transform: F) -> Result<ConflictEntry>
    where
        F: FnMut(&mut ConflictEntry),
    {
        let key = conflict_key(id);
        let Some(bytes) = self
            .db
            .get(&key)
            .map_err(|err| EventError::Storage(err.to_string()))?
        else {
            return Err(EventError::Storage(format!("conflict {} not found", id)));
        };
        let mut entry: ConflictEntry = serde_json::from_slice(&bytes)?;
        transform(&mut entry);
        let encoded = serde_json::to_vec(&entry)?;
        self.db
            .put(key, encoded)
            .map_err(|err| EventError::Storage(err.to_string()))?;
        Ok(entry)
    }

    pub fn next_id(&self) -> SnowflakeId {
        let mut guard = self.id_generator.lock();
        guard.next_id()
    }
}

fn conflict_key(id: u64) -> Vec<u8> {
    let mut key = conflict_prefix();
    key.push(SEP);
    key.extend_from_slice(&id.to_be_bytes());
    key
}

fn conflict_prefix() -> Vec<u8> {
    PREFIX_CONFLICT.to_vec()
}