use super::maintenance::CompressionOptions;
use crate::api::kite::{
KiteOptions as RustKiteOptions, KiteRuntimeProfile as RustKiteRuntimeProfile,
};
use crate::core::single_file::{
SingleFileOpenOptions as RustOpenOptions, SnapshotParseMode as RustSnapshotParseMode,
SyncMode as RustSyncMode,
};
use crate::replication::types::ReplicationRole;
use crate::types::{CacheOptions, PropertyCacheConfig, QueryCacheConfig, TraversalCacheConfig};
use pyo3::exceptions::PyValueError;
use pyo3::prelude::*;
use std::str::FromStr;
#[pyclass(name = "SyncMode")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SyncMode {
pub(crate) mode: RustSyncMode,
}
#[pymethods]
impl SyncMode {
#[staticmethod]
fn full() -> Self {
Self {
mode: RustSyncMode::Full,
}
}
#[staticmethod]
fn normal() -> Self {
Self {
mode: RustSyncMode::Normal,
}
}
#[staticmethod]
fn off() -> Self {
Self {
mode: RustSyncMode::Off,
}
}
fn __repr__(&self) -> String {
match self.mode {
RustSyncMode::Full => "SyncMode.full()".to_string(),
RustSyncMode::Normal => "SyncMode.normal()".to_string(),
RustSyncMode::Off => "SyncMode.off()".to_string(),
}
}
}
#[pyclass(name = "SnapshotParseMode")]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SnapshotParseMode {
pub(crate) mode: RustSnapshotParseMode,
}
#[pymethods]
impl SnapshotParseMode {
#[staticmethod]
fn strict() -> Self {
Self {
mode: RustSnapshotParseMode::Strict,
}
}
#[staticmethod]
fn salvage() -> Self {
Self {
mode: RustSnapshotParseMode::Salvage,
}
}
fn __repr__(&self) -> String {
match self.mode {
RustSnapshotParseMode::Strict => "SnapshotParseMode.strict()".to_string(),
RustSnapshotParseMode::Salvage => "SnapshotParseMode.salvage()".to_string(),
}
}
}
#[pyclass(name = "OpenOptions")]
#[derive(Debug, Clone, Default)]
pub struct OpenOptions {
#[pyo3(get, set)]
pub read_only: Option<bool>,
#[pyo3(get, set)]
pub create_if_missing: Option<bool>,
#[pyo3(get, set)]
pub mvcc: Option<bool>,
#[pyo3(get, set)]
pub mvcc_gc_interval_ms: Option<i64>,
#[pyo3(get, set)]
pub mvcc_retention_ms: Option<i64>,
#[pyo3(get, set)]
pub mvcc_max_chain_depth: Option<i64>,
#[pyo3(get, set)]
pub page_size: Option<u32>,
#[pyo3(get, set)]
pub wal_size: Option<u32>,
#[pyo3(get, set)]
pub auto_checkpoint: Option<bool>,
#[pyo3(get, set)]
pub checkpoint_threshold: Option<f64>,
#[pyo3(get, set)]
pub background_checkpoint: Option<bool>,
#[pyo3(get, set)]
pub checkpoint_compression: Option<CompressionOptions>,
#[pyo3(get, set)]
pub cache_snapshot: Option<bool>,
#[pyo3(get, set)]
pub cache_enabled: Option<bool>,
#[pyo3(get, set)]
pub cache_max_node_props: Option<i64>,
#[pyo3(get, set)]
pub cache_max_edge_props: Option<i64>,
#[pyo3(get, set)]
pub cache_max_traversal_entries: Option<i64>,
#[pyo3(get, set)]
pub cache_max_query_entries: Option<i64>,
#[pyo3(get, set)]
pub cache_query_ttl_ms: Option<i64>,
pub sync_mode: Option<SyncMode>,
#[pyo3(get, set)]
pub group_commit_enabled: Option<bool>,
#[pyo3(get, set)]
pub group_commit_window_ms: Option<i64>,
#[pyo3(get, set)]
pub snapshot_parse_mode: Option<SnapshotParseMode>,
#[pyo3(get, set)]
pub replication_role: Option<String>,
#[pyo3(get, set)]
pub replication_sidecar_path: Option<String>,
#[pyo3(get, set)]
pub replication_source_db_path: Option<String>,
#[pyo3(get, set)]
pub replication_source_sidecar_path: Option<String>,
#[pyo3(get, set)]
pub replication_segment_max_bytes: Option<i64>,
#[pyo3(get, set)]
pub replication_retention_min_entries: Option<i64>,
#[pyo3(get, set)]
pub replication_retention_min_ms: Option<i64>,
}
#[pymethods]
impl OpenOptions {
#[new]
#[pyo3(signature = (
read_only=None,
create_if_missing=None,
mvcc=None,
mvcc_gc_interval_ms=None,
mvcc_retention_ms=None,
mvcc_max_chain_depth=None,
page_size=None,
wal_size=None,
auto_checkpoint=None,
checkpoint_threshold=None,
background_checkpoint=None,
checkpoint_compression=None,
cache_snapshot=None,
cache_enabled=None,
cache_max_node_props=None,
cache_max_edge_props=None,
cache_max_traversal_entries=None,
cache_max_query_entries=None,
cache_query_ttl_ms=None,
sync_mode=None,
group_commit_enabled=None,
group_commit_window_ms=None,
snapshot_parse_mode=None,
replication_role=None,
replication_sidecar_path=None,
replication_source_db_path=None,
replication_source_sidecar_path=None,
replication_segment_max_bytes=None,
replication_retention_min_entries=None,
replication_retention_min_ms=None
))]
#[allow(clippy::too_many_arguments)]
fn new(
read_only: Option<bool>,
create_if_missing: Option<bool>,
mvcc: Option<bool>,
mvcc_gc_interval_ms: Option<i64>,
mvcc_retention_ms: Option<i64>,
mvcc_max_chain_depth: Option<i64>,
page_size: Option<u32>,
wal_size: Option<u32>,
auto_checkpoint: Option<bool>,
checkpoint_threshold: Option<f64>,
background_checkpoint: Option<bool>,
checkpoint_compression: Option<CompressionOptions>,
cache_snapshot: Option<bool>,
cache_enabled: Option<bool>,
cache_max_node_props: Option<i64>,
cache_max_edge_props: Option<i64>,
cache_max_traversal_entries: Option<i64>,
cache_max_query_entries: Option<i64>,
cache_query_ttl_ms: Option<i64>,
sync_mode: Option<SyncMode>,
group_commit_enabled: Option<bool>,
group_commit_window_ms: Option<i64>,
snapshot_parse_mode: Option<SnapshotParseMode>,
replication_role: Option<String>,
replication_sidecar_path: Option<String>,
replication_source_db_path: Option<String>,
replication_source_sidecar_path: Option<String>,
replication_segment_max_bytes: Option<i64>,
replication_retention_min_entries: Option<i64>,
replication_retention_min_ms: Option<i64>,
) -> Self {
Self {
read_only,
create_if_missing,
mvcc,
mvcc_gc_interval_ms,
mvcc_retention_ms,
mvcc_max_chain_depth,
page_size,
wal_size,
auto_checkpoint,
checkpoint_threshold,
background_checkpoint,
checkpoint_compression,
cache_snapshot,
cache_enabled,
cache_max_node_props,
cache_max_edge_props,
cache_max_traversal_entries,
cache_max_query_entries,
cache_query_ttl_ms,
sync_mode,
group_commit_enabled,
group_commit_window_ms,
snapshot_parse_mode,
replication_role,
replication_sidecar_path,
replication_source_db_path,
replication_source_sidecar_path,
replication_segment_max_bytes,
replication_retention_min_entries,
replication_retention_min_ms,
}
}
fn __repr__(&self) -> String {
format!(
"OpenOptions(read_only={:?}, create_if_missing={:?}, cache_enabled={:?})",
self.read_only, self.create_if_missing, self.cache_enabled
)
}
}
impl TryFrom<OpenOptions> for RustOpenOptions {
type Error = PyErr;
fn try_from(opts: OpenOptions) -> Result<Self, Self::Error> {
opts.to_single_file_options()
}
}
impl OpenOptions {
pub fn to_single_file_options(&self) -> PyResult<RustOpenOptions> {
let mut rust_opts = RustOpenOptions::new();
if let Some(v) = self.read_only {
rust_opts = rust_opts.read_only(v);
}
if let Some(v) = self.create_if_missing {
rust_opts = rust_opts.create_if_missing(v);
}
if let Some(v) = self.mvcc {
rust_opts = rust_opts.mvcc(v);
}
if let Some(v) = self.mvcc_gc_interval_ms {
rust_opts = rust_opts.mvcc_gc_interval_ms(v as u64);
}
if let Some(v) = self.mvcc_retention_ms {
rust_opts = rust_opts.mvcc_retention_ms(v as u64);
}
if let Some(v) = self.mvcc_max_chain_depth {
rust_opts = rust_opts.mvcc_max_chain_depth(v as usize);
}
if let Some(v) = self.page_size {
rust_opts = rust_opts.page_size(v as usize);
}
if let Some(v) = self.wal_size {
rust_opts = rust_opts.wal_size(v as usize);
}
if let Some(v) = self.auto_checkpoint {
rust_opts = rust_opts.auto_checkpoint(v);
}
if let Some(v) = self.checkpoint_threshold {
rust_opts = rust_opts.checkpoint_threshold(v);
}
if let Some(v) = self.background_checkpoint {
rust_opts = rust_opts.background_checkpoint(v);
}
if let Some(ref compression) = self.checkpoint_compression {
rust_opts = rust_opts.checkpoint_compression(Some(compression.to_core()?));
}
if self.cache_enabled == Some(true) {
let property_cache = Some(PropertyCacheConfig {
max_node_props: self.cache_max_node_props.unwrap_or(10000) as usize,
max_edge_props: self.cache_max_edge_props.unwrap_or(10000) as usize,
});
let traversal_cache = Some(TraversalCacheConfig {
max_entries: self.cache_max_traversal_entries.unwrap_or(5000) as usize,
max_neighbors_per_entry: 100,
});
let query_cache = Some(QueryCacheConfig {
max_entries: self.cache_max_query_entries.unwrap_or(1000) as usize,
ttl_ms: self.cache_query_ttl_ms.map(|v| v as u64),
});
rust_opts = rust_opts.cache(Some(CacheOptions {
enabled: true,
property_cache,
traversal_cache,
query_cache,
}));
}
if let Some(sync) = self.sync_mode {
rust_opts = rust_opts.sync_mode(sync.mode);
}
if let Some(enabled) = self.group_commit_enabled {
rust_opts = rust_opts.group_commit_enabled(enabled);
}
if let Some(window_ms) = self.group_commit_window_ms {
if window_ms >= 0 {
rust_opts = rust_opts.group_commit_window_ms(window_ms as u64);
}
}
if let Some(mode) = self.snapshot_parse_mode {
rust_opts = rust_opts.snapshot_parse_mode(mode.mode);
}
if let Some(ref role) = self.replication_role {
let role = ReplicationRole::from_str(role).map_err(|error| {
PyValueError::new_err(format!("Invalid replication_role '{role}': {error}"))
})?;
rust_opts = rust_opts.replication_role(role);
}
if let Some(ref path) = self.replication_sidecar_path {
rust_opts = rust_opts.replication_sidecar_path(path);
}
if let Some(ref path) = self.replication_source_db_path {
rust_opts = rust_opts.replication_source_db_path(path);
}
if let Some(ref path) = self.replication_source_sidecar_path {
rust_opts = rust_opts.replication_source_sidecar_path(path);
}
if let Some(value) = self.replication_segment_max_bytes {
if value < 0 {
return Err(PyValueError::new_err(
"replication_segment_max_bytes must be non-negative",
));
}
rust_opts = rust_opts.replication_segment_max_bytes(value as u64);
}
if let Some(value) = self.replication_retention_min_entries {
if value < 0 {
return Err(PyValueError::new_err(
"replication_retention_min_entries must be non-negative",
));
}
rust_opts = rust_opts.replication_retention_min_entries(value as u64);
}
if let Some(value) = self.replication_retention_min_ms {
if value < 0 {
return Err(PyValueError::new_err(
"replication_retention_min_ms must be non-negative",
));
}
rust_opts = rust_opts.replication_retention_min_ms(value as u64);
}
Ok(rust_opts)
}
pub fn from_kite_options(opts: RustKiteOptions) -> Self {
let replication_role = match opts.replication_role {
ReplicationRole::Disabled => "disabled",
ReplicationRole::Primary => "primary",
ReplicationRole::Replica => "replica",
}
.to_string();
Self {
read_only: Some(opts.read_only),
create_if_missing: Some(opts.create_if_missing),
mvcc: Some(opts.mvcc),
mvcc_gc_interval_ms: opts.mvcc_gc_interval_ms.and_then(|v| i64::try_from(v).ok()),
mvcc_retention_ms: opts.mvcc_retention_ms.and_then(|v| i64::try_from(v).ok()),
mvcc_max_chain_depth: opts
.mvcc_max_chain_depth
.and_then(|v| i64::try_from(v).ok()),
page_size: None,
wal_size: opts.wal_size.and_then(|v| u32::try_from(v).ok()),
auto_checkpoint: None,
checkpoint_threshold: opts.checkpoint_threshold,
background_checkpoint: None,
checkpoint_compression: None,
cache_snapshot: None,
cache_enabled: None,
cache_max_node_props: None,
cache_max_edge_props: None,
cache_max_traversal_entries: None,
cache_max_query_entries: None,
cache_query_ttl_ms: None,
sync_mode: Some(SyncMode {
mode: opts.sync_mode,
}),
group_commit_enabled: Some(opts.group_commit_enabled),
group_commit_window_ms: i64::try_from(opts.group_commit_window_ms).ok(),
snapshot_parse_mode: None,
replication_role: Some(replication_role),
replication_sidecar_path: opts
.replication_sidecar_path
.map(|p| p.to_string_lossy().to_string()),
replication_source_db_path: opts
.replication_source_db_path
.map(|p| p.to_string_lossy().to_string()),
replication_source_sidecar_path: opts
.replication_source_sidecar_path
.map(|p| p.to_string_lossy().to_string()),
replication_segment_max_bytes: opts
.replication_segment_max_bytes
.and_then(|v| i64::try_from(v).ok()),
replication_retention_min_entries: opts
.replication_retention_min_entries
.and_then(|v| i64::try_from(v).ok()),
replication_retention_min_ms: opts
.replication_retention_min_ms
.and_then(|v| i64::try_from(v).ok()),
}
}
}
#[pyclass(name = "RuntimeProfile")]
#[derive(Debug, Clone)]
pub struct RuntimeProfile {
#[pyo3(get, set)]
pub open_options: OpenOptions,
#[pyo3(get, set)]
pub close_checkpoint_if_wal_usage_at_least: Option<f64>,
}
#[pymethods]
impl RuntimeProfile {
fn __repr__(&self) -> String {
format!(
"RuntimeProfile(close_checkpoint_if_wal_usage_at_least={:?})",
self.close_checkpoint_if_wal_usage_at_least
)
}
}
impl RuntimeProfile {
pub fn from_kite_runtime_profile(profile: RustKiteRuntimeProfile) -> Self {
Self {
open_options: OpenOptions::from_kite_options(profile.options),
close_checkpoint_if_wal_usage_at_least: profile.close_checkpoint_if_wal_usage_at_least,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sync_mode_full() {
let mode = SyncMode::full();
assert_eq!(mode.mode, RustSyncMode::Full);
}
#[test]
fn test_sync_mode_normal() {
let mode = SyncMode::normal();
assert_eq!(mode.mode, RustSyncMode::Normal);
}
#[test]
fn test_sync_mode_off() {
let mode = SyncMode::off();
assert_eq!(mode.mode, RustSyncMode::Off);
}
#[test]
fn test_open_options_default() {
let opts = OpenOptions::default();
assert!(opts.read_only.is_none());
assert!(opts.create_if_missing.is_none());
}
#[test]
fn test_open_options_to_rust() {
let opts = OpenOptions {
read_only: Some(true),
create_if_missing: Some(false),
page_size: Some(8192),
group_commit_enabled: Some(true),
group_commit_window_ms: Some(5),
..Default::default()
};
let rust_opts: RustOpenOptions = opts.try_into().expect("expected value");
assert!(rust_opts.read_only);
assert!(!rust_opts.create_if_missing);
assert!(rust_opts.group_commit_enabled);
assert_eq!(rust_opts.group_commit_window_ms, 5);
}
}