kitedb 0.2.18

High-performance embedded graph database
Documentation
//! Replication token/cursor types.

use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use std::str::FromStr;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub enum ReplicationRole {
  #[default]
  Disabled,
  Primary,
  Replica,
}

impl fmt::Display for ReplicationRole {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    let value = match self {
      ReplicationRole::Disabled => "disabled",
      ReplicationRole::Primary => "primary",
      ReplicationRole::Replica => "replica",
    };
    write!(f, "{value}")
  }
}

impl FromStr for ReplicationRole {
  type Err = ReplicationParseError;

  fn from_str(raw: &str) -> Result<Self, Self::Err> {
    match raw {
      "disabled" => Ok(Self::Disabled),
      "primary" => Ok(Self::Primary),
      "replica" => Ok(Self::Replica),
      _ => Err(ReplicationParseError::new(format!(
        "invalid replication role: {raw}"
      ))),
    }
  }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicationParseError {
  message: String,
}

impl ReplicationParseError {
  fn new(message: impl Into<String>) -> Self {
    Self {
      message: message.into(),
    }
  }
}

impl fmt::Display for ReplicationParseError {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    write!(f, "{}", self.message)
  }
}

impl std::error::Error for ReplicationParseError {}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct CommitToken {
  pub epoch: u64,
  pub log_index: u64,
}

impl CommitToken {
  pub const fn new(epoch: u64, log_index: u64) -> Self {
    Self { epoch, log_index }
  }
}

impl fmt::Display for CommitToken {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    write!(f, "{}:{}", self.epoch, self.log_index)
  }
}

impl Ord for CommitToken {
  fn cmp(&self, other: &Self) -> Ordering {
    self
      .epoch
      .cmp(&other.epoch)
      .then_with(|| self.log_index.cmp(&other.log_index))
  }
}

impl PartialOrd for CommitToken {
  fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
    Some(self.cmp(other))
  }
}

impl FromStr for CommitToken {
  type Err = ReplicationParseError;

  fn from_str(raw: &str) -> Result<Self, Self::Err> {
    let mut parts = raw.split(':');
    let epoch = parse_u64_component(parts.next(), "epoch", raw)?;
    let log_index = parse_u64_component(parts.next(), "log_index", raw)?;

    if parts.next().is_some() {
      return Err(ReplicationParseError::new(format!(
        "invalid token format: {raw}"
      )));
    }

    Ok(Self::new(epoch, log_index))
  }
}

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ReplicationCursor {
  pub epoch: u64,
  pub segment_id: u64,
  pub segment_offset: u64,
  pub log_index: u64,
}

impl ReplicationCursor {
  pub const fn new(epoch: u64, segment_id: u64, segment_offset: u64, log_index: u64) -> Self {
    Self {
      epoch,
      segment_id,
      segment_offset,
      log_index,
    }
  }
}

impl fmt::Display for ReplicationCursor {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    write!(
      f,
      "{}:{}:{}:{}",
      self.epoch, self.segment_id, self.segment_offset, self.log_index
    )
  }
}

impl Ord for ReplicationCursor {
  fn cmp(&self, other: &Self) -> Ordering {
    self
      .epoch
      .cmp(&other.epoch)
      .then_with(|| self.log_index.cmp(&other.log_index))
      .then_with(|| self.segment_id.cmp(&other.segment_id))
      .then_with(|| self.segment_offset.cmp(&other.segment_offset))
  }
}

impl PartialOrd for ReplicationCursor {
  fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
    Some(self.cmp(other))
  }
}

impl FromStr for ReplicationCursor {
  type Err = ReplicationParseError;

  fn from_str(raw: &str) -> Result<Self, Self::Err> {
    let mut parts = raw.split(':');

    let epoch = parse_u64_component(parts.next(), "epoch", raw)?;
    let segment_id = parse_u64_component(parts.next(), "segment_id", raw)?;
    let segment_offset = parse_u64_component(parts.next(), "segment_offset", raw)?;
    let log_index = parse_u64_component(parts.next(), "log_index", raw)?;

    if parts.next().is_some() {
      return Err(ReplicationParseError::new(format!(
        "invalid cursor format: {raw}"
      )));
    }

    Ok(Self::new(epoch, segment_id, segment_offset, log_index))
  }
}

fn parse_u64_component(
  value: Option<&str>,
  component: &'static str,
  original: &str,
) -> Result<u64, ReplicationParseError> {
  let value = value.ok_or_else(|| {
    ReplicationParseError::new(format!(
      "invalid replication identifier ({component} missing): {original}"
    ))
  })?;

  if value.is_empty() || !value.bytes().all(|byte| byte.is_ascii_digit()) {
    return Err(ReplicationParseError::new(format!(
      "invalid {component}: {value}"
    )));
  }

  value.parse::<u64>().map_err(|_| {
    ReplicationParseError::new(format!(
      "invalid replication identifier ({component} overflow): {original}"
    ))
  })
}

#[cfg(test)]
mod tests {
  use super::{CommitToken, ReplicationCursor};
  use rand::{rngs::StdRng, Rng, SeedableRng};
  use std::str::FromStr;

  #[test]
  fn token_roundtrip_fuzz_like() {
    let mut rng = StdRng::seed_from_u64(0xdecafbad);

    for _ in 0..2_000 {
      let token = CommitToken::new(rng.gen_range(0..10_000), rng.gen_range(0..10_000_000));
      let parsed = CommitToken::from_str(&token.to_string()).expect("parse token");
      assert_eq!(parsed, token);
    }
  }

  #[test]
  fn cursor_roundtrip_fuzz_like() {
    let mut rng = StdRng::seed_from_u64(0xabba_cafe);

    for _ in 0..2_000 {
      let cursor = ReplicationCursor::new(
        rng.gen_range(0..1024),
        rng.gen_range(0..4096),
        rng.gen_range(0..1_000_000),
        rng.gen_range(0..10_000_000),
      );

      let parsed = ReplicationCursor::from_str(&cursor.to_string()).expect("parse cursor");
      assert_eq!(parsed, cursor);
    }
  }
}