use napi::bindgen_prelude::*;
use napi_derive::napi;
use std::path::PathBuf;
use std::str::FromStr;
use super::traversal::{
JsPathConfig, JsPathResult, JsTraversalDirection, JsTraversalResult, JsTraversalStep,
JsTraverseOptions,
};
use crate::api::kite::KiteRuntimeProfile as RustKiteRuntimeProfile;
use crate::api::pathfinding::{bfs, dijkstra, yen_k_shortest, PathConfig};
use crate::api::traversal::{
TraversalBuilder as RustTraversalBuilder, TraversalDirection, TraverseOptions,
};
use crate::backup as core_backup;
use crate::core::single_file::{
close_single_file, close_single_file_with_options, is_single_file_path, open_single_file,
single_file_extension, ResizeWalOptions as RustResizeWalOptions,
SingleFileCloseOptions as RustSingleFileCloseOptions, SingleFileDB as RustSingleFileDB,
SingleFileOpenOptions as RustOpenOptions,
SingleFileOptimizeOptions as RustSingleFileOptimizeOptions,
SnapshotParseMode as RustSnapshotParseMode, SyncMode as RustSyncMode,
VacuumOptions as RustVacuumOptions,
};
use crate::export as ray_export;
use crate::metrics as core_metrics;
use crate::replication::primary::{
PrimaryReplicationStatus, PrimaryRetentionOutcome, ReplicaLagStatus,
};
use crate::replication::replica::ReplicaReplicationStatus;
use crate::replication::types::{CommitToken, ReplicationRole as RustReplicationRole};
use crate::streaming;
use crate::types::{
CheckResult as RustCheckResult, ETypeId, Edge, EdgeWithProps as CoreEdgeWithProps, NodeId,
PropKeyId, PropValue,
};
use crate::util::compression::{CompressionOptions as CoreCompressionOptions, CompressionType};
use serde_json;
#[napi(string_enum)]
#[derive(Debug)]
pub enum JsSyncMode {
Full,
Normal,
Off,
}
impl From<JsSyncMode> for RustSyncMode {
fn from(mode: JsSyncMode) -> Self {
match mode {
JsSyncMode::Full => RustSyncMode::Full,
JsSyncMode::Normal => RustSyncMode::Normal,
JsSyncMode::Off => RustSyncMode::Off,
}
}
}
#[napi(string_enum)]
#[derive(Debug)]
pub enum JsSnapshotParseMode {
Strict,
Salvage,
}
impl From<JsSnapshotParseMode> for RustSnapshotParseMode {
fn from(mode: JsSnapshotParseMode) -> Self {
match mode {
JsSnapshotParseMode::Strict => RustSnapshotParseMode::Strict,
JsSnapshotParseMode::Salvage => RustSnapshotParseMode::Salvage,
}
}
}
#[napi(string_enum)]
#[derive(Debug)]
pub enum JsReplicationRole {
Disabled,
Primary,
Replica,
}
impl From<JsReplicationRole> for RustReplicationRole {
fn from(role: JsReplicationRole) -> Self {
match role {
JsReplicationRole::Disabled => RustReplicationRole::Disabled,
JsReplicationRole::Primary => RustReplicationRole::Primary,
JsReplicationRole::Replica => RustReplicationRole::Replica,
}
}
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct OpenOptions {
pub read_only: Option<bool>,
pub create_if_missing: Option<bool>,
pub mvcc: Option<bool>,
pub mvcc_gc_interval_ms: Option<i64>,
pub mvcc_retention_ms: Option<i64>,
pub mvcc_max_chain_depth: Option<i64>,
pub page_size: Option<u32>,
pub wal_size: Option<u32>,
pub auto_checkpoint: Option<bool>,
pub checkpoint_threshold: Option<f64>,
pub background_checkpoint: Option<bool>,
pub checkpoint_compression: Option<CompressionOptions>,
pub cache_enabled: Option<bool>,
pub cache_max_node_props: Option<i64>,
pub cache_max_edge_props: Option<i64>,
pub cache_max_traversal_entries: Option<i64>,
pub cache_max_query_entries: Option<i64>,
pub cache_query_ttl_ms: Option<i64>,
pub sync_mode: Option<JsSyncMode>,
pub group_commit_enabled: Option<bool>,
pub group_commit_window_ms: Option<i64>,
pub snapshot_parse_mode: Option<JsSnapshotParseMode>,
pub replication_role: Option<JsReplicationRole>,
pub replication_sidecar_path: Option<String>,
pub replication_source_db_path: Option<String>,
pub replication_source_sidecar_path: Option<String>,
pub replication_segment_max_bytes: Option<i64>,
pub replication_retention_min_entries: Option<i64>,
pub replication_retention_min_ms: Option<i64>,
}
impl From<OpenOptions> for RustOpenOptions {
fn from(opts: OpenOptions) -> Self {
use crate::types::{CacheOptions, PropertyCacheConfig, QueryCacheConfig, TraversalCacheConfig};
let mut rust_opts = RustOpenOptions::new();
if let Some(v) = opts.read_only {
rust_opts = rust_opts.read_only(v);
}
if let Some(v) = opts.create_if_missing {
rust_opts = rust_opts.create_if_missing(v);
}
if let Some(v) = opts.mvcc {
rust_opts = rust_opts.mvcc(v);
}
if let Some(v) = opts.mvcc_gc_interval_ms {
rust_opts = rust_opts.mvcc_gc_interval_ms(v as u64);
}
if let Some(v) = opts.mvcc_retention_ms {
rust_opts = rust_opts.mvcc_retention_ms(v as u64);
}
if let Some(v) = opts.mvcc_max_chain_depth {
rust_opts = rust_opts.mvcc_max_chain_depth(v as usize);
}
if let Some(v) = opts.page_size {
rust_opts = rust_opts.page_size(v as usize);
}
if let Some(v) = opts.wal_size {
rust_opts = rust_opts.wal_size(v as usize);
}
if let Some(v) = opts.auto_checkpoint {
rust_opts = rust_opts.auto_checkpoint(v);
}
if let Some(v) = opts.checkpoint_threshold {
rust_opts = rust_opts.checkpoint_threshold(v);
}
if let Some(v) = opts.background_checkpoint {
rust_opts = rust_opts.background_checkpoint(v);
}
if let Some(compression) = opts.checkpoint_compression {
rust_opts = rust_opts.checkpoint_compression(Some(compression.into()));
}
if opts.cache_enabled == Some(true) {
let property_cache = Some(PropertyCacheConfig {
max_node_props: opts.cache_max_node_props.unwrap_or(10000) as usize,
max_edge_props: opts.cache_max_edge_props.unwrap_or(10000) as usize,
});
let traversal_cache = Some(TraversalCacheConfig {
max_entries: opts.cache_max_traversal_entries.unwrap_or(5000) as usize,
max_neighbors_per_entry: 100,
});
let query_cache = Some(QueryCacheConfig {
max_entries: opts.cache_max_query_entries.unwrap_or(1000) as usize,
ttl_ms: opts.cache_query_ttl_ms.map(|v| v as u64),
});
rust_opts = rust_opts.cache(Some(CacheOptions {
enabled: true,
property_cache,
traversal_cache,
query_cache,
}));
}
if let Some(mode) = opts.sync_mode {
rust_opts = rust_opts.sync_mode(mode.into());
}
if let Some(enabled) = opts.group_commit_enabled {
rust_opts = rust_opts.group_commit_enabled(enabled);
}
if let Some(window_ms) = opts.group_commit_window_ms {
if window_ms >= 0 {
rust_opts = rust_opts.group_commit_window_ms(window_ms as u64);
}
}
if let Some(mode) = opts.snapshot_parse_mode {
rust_opts = rust_opts.snapshot_parse_mode(mode.into());
}
if let Some(role) = opts.replication_role {
rust_opts = rust_opts.replication_role(role.into());
}
if let Some(path) = opts.replication_sidecar_path {
rust_opts = rust_opts.replication_sidecar_path(path);
}
if let Some(path) = opts.replication_source_db_path {
rust_opts = rust_opts.replication_source_db_path(path);
}
if let Some(path) = opts.replication_source_sidecar_path {
rust_opts = rust_opts.replication_source_sidecar_path(path);
}
if let Some(value) = opts.replication_segment_max_bytes {
if value >= 0 {
rust_opts = rust_opts.replication_segment_max_bytes(value as u64);
}
}
if let Some(value) = opts.replication_retention_min_entries {
if value >= 0 {
rust_opts = rust_opts.replication_retention_min_entries(value as u64);
}
}
if let Some(value) = opts.replication_retention_min_ms {
if value >= 0 {
rust_opts = rust_opts.replication_retention_min_ms(value as u64);
}
}
rust_opts
}
}
fn js_sync_mode_from_rust(mode: RustSyncMode) -> JsSyncMode {
match mode {
RustSyncMode::Full => JsSyncMode::Full,
RustSyncMode::Normal => JsSyncMode::Normal,
RustSyncMode::Off => JsSyncMode::Off,
}
}
fn js_replication_role_from_rust(role: RustReplicationRole) -> JsReplicationRole {
match role {
RustReplicationRole::Disabled => JsReplicationRole::Disabled,
RustReplicationRole::Primary => JsReplicationRole::Primary,
RustReplicationRole::Replica => JsReplicationRole::Replica,
}
}
fn open_options_from_kite_profile_options(opts: crate::api::kite::KiteOptions) -> OpenOptions {
OpenOptions {
read_only: Some(opts.read_only),
create_if_missing: Some(opts.create_if_missing),
mvcc: Some(opts.mvcc),
mvcc_gc_interval_ms: opts.mvcc_gc_interval_ms.and_then(|v| i64::try_from(v).ok()),
mvcc_retention_ms: opts.mvcc_retention_ms.and_then(|v| i64::try_from(v).ok()),
mvcc_max_chain_depth: opts
.mvcc_max_chain_depth
.and_then(|v| i64::try_from(v).ok()),
page_size: None,
wal_size: opts.wal_size.and_then(|v| u32::try_from(v).ok()),
auto_checkpoint: None,
checkpoint_threshold: opts.checkpoint_threshold,
background_checkpoint: None,
checkpoint_compression: None,
cache_enabled: None,
cache_max_node_props: None,
cache_max_edge_props: None,
cache_max_traversal_entries: None,
cache_max_query_entries: None,
cache_query_ttl_ms: None,
sync_mode: Some(js_sync_mode_from_rust(opts.sync_mode)),
group_commit_enabled: Some(opts.group_commit_enabled),
group_commit_window_ms: i64::try_from(opts.group_commit_window_ms).ok(),
snapshot_parse_mode: None,
replication_role: Some(js_replication_role_from_rust(opts.replication_role)),
replication_sidecar_path: opts
.replication_sidecar_path
.map(|p| p.to_string_lossy().to_string()),
replication_source_db_path: opts
.replication_source_db_path
.map(|p| p.to_string_lossy().to_string()),
replication_source_sidecar_path: opts
.replication_source_sidecar_path
.map(|p| p.to_string_lossy().to_string()),
replication_segment_max_bytes: opts
.replication_segment_max_bytes
.and_then(|v| i64::try_from(v).ok()),
replication_retention_min_entries: opts
.replication_retention_min_entries
.and_then(|v| i64::try_from(v).ok()),
replication_retention_min_ms: opts
.replication_retention_min_ms
.and_then(|v| i64::try_from(v).ok()),
}
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct RuntimeProfile {
pub open_options: OpenOptions,
pub close_checkpoint_if_wal_usage_at_least: Option<f64>,
}
fn runtime_profile_from_rust(profile: RustKiteRuntimeProfile) -> RuntimeProfile {
RuntimeProfile {
open_options: open_options_from_kite_profile_options(profile.options),
close_checkpoint_if_wal_usage_at_least: profile.close_checkpoint_if_wal_usage_at_least,
}
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct VacuumOptions {
pub shrink_wal: Option<bool>,
pub min_wal_size: Option<i64>,
}
impl From<VacuumOptions> for RustVacuumOptions {
fn from(opts: VacuumOptions) -> Self {
let min_wal_size = opts
.min_wal_size
.and_then(|v| if v >= 0 { Some(v as u64) } else { None });
Self {
shrink_wal: opts.shrink_wal.unwrap_or(true),
min_wal_size,
}
}
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct ResizeWalOptions {
pub allow_shrink: Option<bool>,
pub checkpoint: Option<bool>,
}
impl From<ResizeWalOptions> for RustResizeWalOptions {
fn from(opts: ResizeWalOptions) -> Self {
Self {
allow_shrink: opts.allow_shrink.unwrap_or(false),
checkpoint: opts.checkpoint.unwrap_or(true),
}
}
}
#[napi(string_enum)]
#[derive(Debug)]
pub enum JsCompressionType {
None,
Zstd,
Gzip,
Deflate,
}
impl From<JsCompressionType> for CompressionType {
fn from(value: JsCompressionType) -> Self {
match value {
JsCompressionType::None => CompressionType::None,
JsCompressionType::Zstd => CompressionType::Zstd,
JsCompressionType::Gzip => CompressionType::Gzip,
JsCompressionType::Deflate => CompressionType::Deflate,
}
}
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct CompressionOptions {
pub enabled: Option<bool>,
pub r#type: Option<JsCompressionType>,
pub min_size: Option<u32>,
pub level: Option<i32>,
}
impl From<CompressionOptions> for CoreCompressionOptions {
fn from(opts: CompressionOptions) -> Self {
let mut out = CoreCompressionOptions::default();
if let Some(enabled) = opts.enabled {
out.enabled = enabled;
}
if let Some(t) = opts.r#type {
out.compression_type = t.into();
}
if let Some(min_size) = opts.min_size {
out.min_size = min_size as usize;
}
if let Some(level) = opts.level {
out.level = level;
}
out
}
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct SingleFileOptimizeOptions {
pub compression: Option<CompressionOptions>,
}
impl From<SingleFileOptimizeOptions> for RustSingleFileOptimizeOptions {
fn from(opts: SingleFileOptimizeOptions) -> Self {
RustSingleFileOptimizeOptions {
compression: opts.compression.map(Into::into),
}
}
}
#[napi(object)]
pub struct DbStats {
pub snapshot_gen: i64,
pub snapshot_nodes: i64,
pub snapshot_edges: i64,
pub snapshot_max_node_id: i64,
pub delta_nodes_created: i64,
pub delta_nodes_deleted: i64,
pub delta_edges_added: i64,
pub delta_edges_deleted: i64,
pub wal_segment: i64,
pub wal_bytes: i64,
pub recommend_compact: bool,
pub mvcc_stats: Option<MvccStats>,
}
#[napi(object)]
pub struct MvccStats {
pub active_transactions: i64,
pub min_active_ts: i64,
pub versions_pruned: i64,
pub gc_runs: i64,
pub last_gc_time: i64,
pub committed_writes_size: i64,
pub committed_writes_pruned: i64,
}
#[napi(object)]
pub struct JsReplicaLagStatus {
pub replica_id: String,
pub epoch: i64,
pub applied_log_index: i64,
}
#[napi(object)]
pub struct JsPrimaryReplicationStatus {
pub role: String,
pub epoch: i64,
pub head_log_index: i64,
pub retained_floor: i64,
pub replica_lags: Vec<JsReplicaLagStatus>,
pub sidecar_path: String,
pub last_token: Option<String>,
pub append_attempts: i64,
pub append_failures: i64,
pub append_successes: i64,
}
#[napi(object)]
pub struct JsReplicaReplicationStatus {
pub role: String,
pub source_db_path: Option<String>,
pub source_sidecar_path: Option<String>,
pub applied_epoch: i64,
pub applied_log_index: i64,
pub last_error: Option<String>,
pub needs_reseed: bool,
}
#[napi(object)]
pub struct JsPrimaryRetentionOutcome {
pub pruned_segments: i64,
pub retained_floor: i64,
}
impl From<ReplicaLagStatus> for JsReplicaLagStatus {
fn from(value: ReplicaLagStatus) -> Self {
Self {
replica_id: value.replica_id,
epoch: value.epoch as i64,
applied_log_index: value.applied_log_index as i64,
}
}
}
impl From<PrimaryReplicationStatus> for JsPrimaryReplicationStatus {
fn from(value: PrimaryReplicationStatus) -> Self {
Self {
role: value.role.to_string(),
epoch: value.epoch as i64,
head_log_index: value.head_log_index as i64,
retained_floor: value.retained_floor as i64,
replica_lags: value.replica_lags.into_iter().map(Into::into).collect(),
sidecar_path: value.sidecar_path.to_string_lossy().to_string(),
last_token: value.last_token.map(|token| token.to_string()),
append_attempts: value.append_attempts as i64,
append_failures: value.append_failures as i64,
append_successes: value.append_successes as i64,
}
}
}
impl From<ReplicaReplicationStatus> for JsReplicaReplicationStatus {
fn from(value: ReplicaReplicationStatus) -> Self {
Self {
role: value.role.to_string(),
source_db_path: value
.source_db_path
.map(|path| path.to_string_lossy().to_string()),
source_sidecar_path: value
.source_sidecar_path
.map(|path| path.to_string_lossy().to_string()),
applied_epoch: value.applied_epoch as i64,
applied_log_index: value.applied_log_index as i64,
last_error: value.last_error,
needs_reseed: value.needs_reseed,
}
}
}
impl From<PrimaryRetentionOutcome> for JsPrimaryRetentionOutcome {
fn from(value: PrimaryRetentionOutcome) -> Self {
Self {
pruned_segments: value.pruned_segments as i64,
retained_floor: value.retained_floor as i64,
}
}
}
#[napi(object)]
pub struct ExportOptions {
pub include_nodes: Option<bool>,
pub include_edges: Option<bool>,
pub include_schema: Option<bool>,
pub pretty: Option<bool>,
}
impl ExportOptions {
fn into_rust(self) -> ray_export::ExportOptions {
let mut opts = ray_export::ExportOptions::default();
if let Some(v) = self.include_nodes {
opts.include_nodes = v;
}
if let Some(v) = self.include_edges {
opts.include_edges = v;
}
if let Some(v) = self.include_schema {
opts.include_schema = v;
}
if let Some(v) = self.pretty {
opts.pretty = v;
}
opts
}
}
#[napi(object)]
pub struct ImportOptions {
pub skip_existing: Option<bool>,
pub batch_size: Option<i64>,
}
impl ImportOptions {
fn into_rust(self) -> ray_export::ImportOptions {
let mut opts = ray_export::ImportOptions::default();
if let Some(v) = self.skip_existing {
opts.skip_existing = v;
}
if let Some(v) = self.batch_size {
if v > 0 {
opts.batch_size = v as usize;
}
}
opts
}
}
#[napi(object)]
pub struct ExportResult {
pub node_count: i64,
pub edge_count: i64,
}
#[napi(object)]
pub struct ImportResult {
pub node_count: i64,
pub edge_count: i64,
pub skipped: i64,
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct StreamOptions {
pub batch_size: Option<i64>,
}
impl StreamOptions {
fn into_rust(self) -> Result<crate::streaming::StreamOptions> {
let batch_size = self.batch_size.unwrap_or(0);
if batch_size < 0 {
return Err(Error::from_reason("batchSize must be non-negative"));
}
Ok(crate::streaming::StreamOptions {
batch_size: batch_size as usize,
})
}
}
#[napi(object)]
#[derive(Debug, Default)]
pub struct PaginationOptions {
pub limit: Option<i64>,
pub cursor: Option<String>,
}
impl PaginationOptions {
fn into_rust(self) -> Result<crate::streaming::PaginationOptions> {
let limit = self.limit.unwrap_or(0);
if limit < 0 {
return Err(Error::from_reason("limit must be non-negative"));
}
Ok(crate::streaming::PaginationOptions {
limit: limit as usize,
cursor: self.cursor,
})
}
}
#[napi(object)]
pub struct NodeWithProps {
pub id: i64,
pub key: Option<String>,
pub props: Vec<JsNodeProp>,
}
#[napi(object)]
pub struct EdgeWithProps {
pub src: i64,
pub etype: u32,
pub dst: i64,
pub props: Vec<JsNodeProp>,
}
#[napi(object)]
pub struct NodePage {
pub items: Vec<i64>,
pub next_cursor: Option<String>,
pub has_more: bool,
pub total: Option<i64>,
}
#[napi(object)]
pub struct EdgePage {
pub items: Vec<JsFullEdge>,
pub next_cursor: Option<String>,
pub has_more: bool,
pub total: Option<i64>,
}
#[napi(object)]
pub struct CheckResult {
pub valid: bool,
pub errors: Vec<String>,
pub warnings: Vec<String>,
}
impl From<RustCheckResult> for CheckResult {
fn from(result: RustCheckResult) -> Self {
CheckResult {
valid: result.valid,
errors: result.errors,
warnings: result.warnings,
}
}
}
#[napi(object)]
pub struct JsCacheStats {
pub property_cache_hits: i64,
pub property_cache_misses: i64,
pub property_cache_size: i64,
pub traversal_cache_hits: i64,
pub traversal_cache_misses: i64,
pub traversal_cache_size: i64,
pub query_cache_hits: i64,
pub query_cache_misses: i64,
pub query_cache_size: i64,
}
#[napi(object)]
pub struct CacheLayerMetrics {
pub hits: i64,
pub misses: i64,
pub hit_rate: f64,
pub size: i64,
pub max_size: i64,
pub utilization_percent: f64,
}
#[napi(object)]
pub struct CacheMetrics {
pub enabled: bool,
pub property_cache: CacheLayerMetrics,
pub traversal_cache: CacheLayerMetrics,
pub query_cache: CacheLayerMetrics,
}
#[napi(object)]
pub struct DataMetrics {
pub node_count: i64,
pub edge_count: i64,
pub delta_nodes_created: i64,
pub delta_nodes_deleted: i64,
pub delta_edges_added: i64,
pub delta_edges_deleted: i64,
pub snapshot_generation: i64,
pub max_node_id: i64,
pub schema_labels: i64,
pub schema_etypes: i64,
pub schema_prop_keys: i64,
}
#[napi(object)]
pub struct MvccMetrics {
pub enabled: bool,
pub active_transactions: i64,
pub versions_pruned: i64,
pub gc_runs: i64,
pub min_active_timestamp: i64,
pub committed_writes_size: i64,
pub committed_writes_pruned: i64,
}
#[napi(object)]
pub struct PrimaryReplicationMetrics {
pub epoch: i64,
pub head_log_index: i64,
pub retained_floor: i64,
pub replica_count: i64,
pub stale_epoch_replica_count: i64,
pub max_replica_lag: i64,
pub min_replica_applied_log_index: Option<i64>,
pub sidecar_path: String,
pub last_token: Option<String>,
pub append_attempts: i64,
pub append_failures: i64,
pub append_successes: i64,
}
#[napi(object)]
pub struct ReplicaReplicationMetrics {
pub applied_epoch: i64,
pub applied_log_index: i64,
pub needs_reseed: bool,
pub last_error: Option<String>,
}
#[napi(object)]
pub struct ReplicationMetrics {
pub enabled: bool,
pub role: String,
pub primary: Option<PrimaryReplicationMetrics>,
pub replica: Option<ReplicaReplicationMetrics>,
}
#[napi(object)]
pub struct MemoryMetrics {
pub delta_estimate_bytes: i64,
pub cache_estimate_bytes: i64,
pub snapshot_bytes: i64,
pub total_estimate_bytes: i64,
}
#[napi(object)]
pub struct DatabaseMetrics {
pub path: String,
pub is_single_file: bool,
pub read_only: bool,
pub data: DataMetrics,
pub cache: CacheMetrics,
pub mvcc: Option<MvccMetrics>,
pub replication: ReplicationMetrics,
pub memory: MemoryMetrics,
pub collected_at: i64,
}
#[napi(object)]
pub struct HealthCheckEntry {
pub name: String,
pub passed: bool,
pub message: String,
}
#[napi(object)]
pub struct HealthCheckResult {
pub healthy: bool,
pub checks: Vec<HealthCheckEntry>,
}
#[napi(object)]
pub struct OtlpHttpExportResult {
pub status_code: i64,
pub response_body: String,
}
#[napi(object)]
#[derive(Default, Clone)]
pub struct PushReplicationMetricsOtelOptions {
pub timeout_ms: Option<i64>,
pub bearer_token: Option<String>,
pub retry_max_attempts: Option<i64>,
pub retry_backoff_ms: Option<i64>,
pub retry_backoff_max_ms: Option<i64>,
pub retry_jitter_ratio: Option<f64>,
pub adaptive_retry: Option<bool>,
pub adaptive_retry_mode: Option<String>,
pub adaptive_retry_ewma_alpha: Option<f64>,
pub circuit_breaker_failure_threshold: Option<i64>,
pub circuit_breaker_open_ms: Option<i64>,
pub circuit_breaker_half_open_probes: Option<i64>,
pub circuit_breaker_state_path: Option<String>,
pub circuit_breaker_state_url: Option<String>,
pub circuit_breaker_state_patch: Option<bool>,
pub circuit_breaker_state_patch_batch: Option<bool>,
pub circuit_breaker_state_patch_batch_max_keys: Option<i64>,
pub circuit_breaker_state_patch_merge: Option<bool>,
pub circuit_breaker_state_patch_merge_max_keys: Option<i64>,
pub circuit_breaker_state_patch_retry_max_attempts: Option<i64>,
pub circuit_breaker_state_cas: Option<bool>,
pub circuit_breaker_state_lease_id: Option<String>,
pub circuit_breaker_scope_key: Option<String>,
pub compression_gzip: Option<bool>,
pub https_only: Option<bool>,
pub ca_cert_pem_path: Option<String>,
pub client_cert_pem_path: Option<String>,
pub client_key_pem_path: Option<String>,
}
impl From<core_metrics::CacheLayerMetrics> for CacheLayerMetrics {
fn from(metrics: core_metrics::CacheLayerMetrics) -> Self {
CacheLayerMetrics {
hits: metrics.hits,
misses: metrics.misses,
hit_rate: metrics.hit_rate,
size: metrics.size,
max_size: metrics.max_size,
utilization_percent: metrics.utilization_percent,
}
}
}
impl From<core_metrics::CacheMetrics> for CacheMetrics {
fn from(metrics: core_metrics::CacheMetrics) -> Self {
CacheMetrics {
enabled: metrics.enabled,
property_cache: metrics.property_cache.into(),
traversal_cache: metrics.traversal_cache.into(),
query_cache: metrics.query_cache.into(),
}
}
}
impl From<core_metrics::DataMetrics> for DataMetrics {
fn from(metrics: core_metrics::DataMetrics) -> Self {
DataMetrics {
node_count: metrics.node_count,
edge_count: metrics.edge_count,
delta_nodes_created: metrics.delta_nodes_created,
delta_nodes_deleted: metrics.delta_nodes_deleted,
delta_edges_added: metrics.delta_edges_added,
delta_edges_deleted: metrics.delta_edges_deleted,
snapshot_generation: metrics.snapshot_generation,
max_node_id: metrics.max_node_id,
schema_labels: metrics.schema_labels,
schema_etypes: metrics.schema_etypes,
schema_prop_keys: metrics.schema_prop_keys,
}
}
}
impl From<core_metrics::MvccMetrics> for MvccMetrics {
fn from(metrics: core_metrics::MvccMetrics) -> Self {
MvccMetrics {
enabled: metrics.enabled,
active_transactions: metrics.active_transactions,
versions_pruned: metrics.versions_pruned,
gc_runs: metrics.gc_runs,
min_active_timestamp: metrics.min_active_timestamp,
committed_writes_size: metrics.committed_writes_size,
committed_writes_pruned: metrics.committed_writes_pruned,
}
}
}
impl From<core_metrics::PrimaryReplicationMetrics> for PrimaryReplicationMetrics {
fn from(metrics: core_metrics::PrimaryReplicationMetrics) -> Self {
PrimaryReplicationMetrics {
epoch: metrics.epoch,
head_log_index: metrics.head_log_index,
retained_floor: metrics.retained_floor,
replica_count: metrics.replica_count,
stale_epoch_replica_count: metrics.stale_epoch_replica_count,
max_replica_lag: metrics.max_replica_lag,
min_replica_applied_log_index: metrics.min_replica_applied_log_index,
sidecar_path: metrics.sidecar_path,
last_token: metrics.last_token,
append_attempts: metrics.append_attempts,
append_failures: metrics.append_failures,
append_successes: metrics.append_successes,
}
}
}
impl From<core_metrics::ReplicaReplicationMetrics> for ReplicaReplicationMetrics {
fn from(metrics: core_metrics::ReplicaReplicationMetrics) -> Self {
ReplicaReplicationMetrics {
applied_epoch: metrics.applied_epoch,
applied_log_index: metrics.applied_log_index,
needs_reseed: metrics.needs_reseed,
last_error: metrics.last_error,
}
}
}
impl From<core_metrics::ReplicationMetrics> for ReplicationMetrics {
fn from(metrics: core_metrics::ReplicationMetrics) -> Self {
ReplicationMetrics {
enabled: metrics.enabled,
role: metrics.role,
primary: metrics.primary.map(Into::into),
replica: metrics.replica.map(Into::into),
}
}
}
impl From<core_metrics::MemoryMetrics> for MemoryMetrics {
fn from(metrics: core_metrics::MemoryMetrics) -> Self {
MemoryMetrics {
delta_estimate_bytes: metrics.delta_estimate_bytes,
cache_estimate_bytes: metrics.cache_estimate_bytes,
snapshot_bytes: metrics.snapshot_bytes,
total_estimate_bytes: metrics.total_estimate_bytes,
}
}
}
impl From<core_metrics::DatabaseMetrics> for DatabaseMetrics {
fn from(metrics: core_metrics::DatabaseMetrics) -> Self {
DatabaseMetrics {
path: metrics.path,
is_single_file: metrics.is_single_file,
read_only: metrics.read_only,
data: metrics.data.into(),
cache: metrics.cache.into(),
mvcc: metrics.mvcc.map(Into::into),
replication: metrics.replication.into(),
memory: metrics.memory.into(),
collected_at: metrics.collected_at_ms,
}
}
}
impl From<core_metrics::HealthCheckEntry> for HealthCheckEntry {
fn from(entry: core_metrics::HealthCheckEntry) -> Self {
HealthCheckEntry {
name: entry.name,
passed: entry.passed,
message: entry.message,
}
}
}
impl From<core_metrics::HealthCheckResult> for HealthCheckResult {
fn from(result: core_metrics::HealthCheckResult) -> Self {
HealthCheckResult {
healthy: result.healthy,
checks: result.checks.into_iter().map(Into::into).collect(),
}
}
}
impl From<core_metrics::OtlpHttpExportResult> for OtlpHttpExportResult {
fn from(result: core_metrics::OtlpHttpExportResult) -> Self {
OtlpHttpExportResult {
status_code: result.status_code,
response_body: result.response_body,
}
}
}
#[napi(string_enum)]
#[derive(Clone)]
pub enum PropType {
Null,
Bool,
Int,
Float,
String,
Vector,
}
#[napi(object)]
#[derive(Clone)]
pub struct JsPropValue {
pub prop_type: PropType,
pub bool_value: Option<bool>,
pub int_value: Option<i64>,
pub float_value: Option<f64>,
pub string_value: Option<String>,
pub vector_value: Option<Vec<f64>>,
}
impl From<PropValue> for JsPropValue {
fn from(value: PropValue) -> Self {
match value {
PropValue::Null => JsPropValue {
prop_type: PropType::Null,
bool_value: None,
int_value: None,
float_value: None,
string_value: None,
vector_value: None,
},
PropValue::Bool(v) => JsPropValue {
prop_type: PropType::Bool,
bool_value: Some(v),
int_value: None,
float_value: None,
string_value: None,
vector_value: None,
},
PropValue::I64(v) => JsPropValue {
prop_type: PropType::Int,
bool_value: None,
int_value: Some(v),
float_value: None,
string_value: None,
vector_value: None,
},
PropValue::F64(v) => JsPropValue {
prop_type: PropType::Float,
bool_value: None,
int_value: None,
float_value: Some(v),
string_value: None,
vector_value: None,
},
PropValue::String(v) => JsPropValue {
prop_type: PropType::String,
bool_value: None,
int_value: None,
float_value: None,
string_value: Some(v),
vector_value: None,
},
PropValue::VectorF32(v) => JsPropValue {
prop_type: PropType::Vector,
bool_value: None,
int_value: None,
float_value: None,
string_value: None,
vector_value: Some(v.iter().map(|&x| x as f64).collect()),
},
}
}
}
impl From<JsPropValue> for PropValue {
fn from(value: JsPropValue) -> Self {
match value.prop_type {
PropType::Null => PropValue::Null,
PropType::Bool => PropValue::Bool(value.bool_value.unwrap_or(false)),
PropType::Int => PropValue::I64(value.int_value.unwrap_or(0)),
PropType::Float => PropValue::F64(value.float_value.unwrap_or(0.0)),
PropType::String => PropValue::String(value.string_value.unwrap_or_default()),
PropType::Vector => {
let vector = value.vector_value.unwrap_or_default();
PropValue::VectorF32(vector.iter().map(|&x| x as f32).collect())
}
}
}
}
#[napi(object)]
pub struct JsEdge {
pub etype: u32,
pub node_id: i64,
}
#[napi(object)]
pub struct JsFullEdge {
pub src: i64,
pub etype: u32,
pub dst: i64,
}
#[napi(object)]
pub struct JsEdgeWithPropsInput {
pub src: i64,
pub etype: u32,
pub dst: i64,
pub props: Vec<JsNodeProp>,
}
#[napi(object)]
pub struct JsNodeProp {
pub key_id: u32,
pub value: JsPropValue,
}
#[allow(clippy::large_enum_variant)]
enum DatabaseInner {
SingleFile(RustSingleFileDB),
}
#[napi]
pub struct Database {
inner: Option<DatabaseInner>,
}
#[napi]
impl Database {
#[napi(factory)]
pub fn open(path: String, options: Option<OpenOptions>) -> Result<Database> {
let options = options.unwrap_or_default();
let path_buf = PathBuf::from(&path);
if path_buf.exists() && path_buf.is_dir() {
return Err(Error::from_reason(
"Multi-file databases are no longer supported. Provide a single-file path.",
));
}
let mut db_path = path_buf;
if db_path.extension().is_some() {
if !is_single_file_path(&db_path) {
let ext = db_path
.extension()
.map(|value| value.to_string_lossy())
.unwrap_or_else(|| "".into());
return Err(Error::from_reason(format!(
"Invalid database extension '.{ext}'. Single-file databases must use {} (or pass a path without an extension).",
single_file_extension()
)));
}
} else {
db_path = PathBuf::from(format!("{path}{}", single_file_extension()));
}
let opts: RustOpenOptions = options.into();
let db = open_single_file(&db_path, opts)
.map_err(|e| Error::from_reason(format!("Failed to open database: {e}")))?;
Ok(Database {
inner: Some(DatabaseInner::SingleFile(db)),
})
}
#[napi]
pub fn close(&mut self) -> Result<()> {
if let Some(db) = self.inner.take() {
match db {
DatabaseInner::SingleFile(db) => {
close_single_file(db)
.map_err(|e| Error::from_reason(format!("Failed to close database: {e}")))?;
}
}
}
Ok(())
}
#[napi]
pub fn close_with_checkpoint_if_wal_over(&mut self, threshold: f64) -> Result<()> {
if let Some(db) = self.inner.take() {
match db {
DatabaseInner::SingleFile(db) => close_single_file_with_options(
db,
RustSingleFileCloseOptions::new().checkpoint_if_wal_usage_at_least(threshold),
)
.map_err(|e| Error::from_reason(format!("Failed to close database: {e}")))?,
}
}
Ok(())
}
#[napi(getter)]
pub fn is_open(&self) -> bool {
self.inner.is_some()
}
#[napi(getter)]
pub fn path(&self) -> Result<String> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.path.to_string_lossy().to_string()),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(getter)]
pub fn read_only(&self) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.read_only),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn begin(&self, read_only: Option<bool>) -> Result<i64> {
let read_only = read_only.unwrap_or(false);
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let txid = db
.begin(read_only)
.map_err(|e| Error::from_reason(format!("Failed to begin transaction: {e}")))?;
Ok(txid as i64)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn begin_bulk(&self) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let txid = db
.begin_bulk()
.map_err(|e| Error::from_reason(format!("Failed to begin bulk transaction: {e}")))?;
Ok(txid as i64)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn commit(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.commit()
.map_err(|e| Error::from_reason(format!("Failed to commit: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn commit_with_token(&self) -> Result<Option<String>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.commit_with_token()
.map(|token| token.map(|value| value.to_string()))
.map_err(|e| Error::from_reason(format!("Failed to commit with token: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn rollback(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.rollback()
.map_err(|e| Error::from_reason(format!("Failed to rollback: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn has_transaction(&self) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.has_transaction()),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn wait_for_token(&self, token: String, timeout_ms: i64) -> Result<bool> {
if timeout_ms < 0 {
return Err(Error::from_reason("timeoutMs must be non-negative"));
}
let token = CommitToken::from_str(&token)
.map_err(|e| Error::from_reason(format!("Invalid commit token: {e}")))?;
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.wait_for_token(token, timeout_ms as u64)
.map_err(|e| Error::from_reason(format!("Failed waiting for token: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn primary_replication_status(&self) -> Result<Option<JsPrimaryReplicationStatus>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.primary_replication_status().map(Into::into)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn replica_replication_status(&self) -> Result<Option<JsReplicaReplicationStatus>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.replica_replication_status().map(Into::into)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn primary_promote_to_next_epoch(&self) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.primary_promote_to_next_epoch()
.map(|epoch| epoch as i64)
.map_err(|e| Error::from_reason(format!("Failed to promote primary: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn primary_report_replica_progress(
&self,
replica_id: String,
epoch: i64,
applied_log_index: i64,
) -> Result<()> {
if epoch < 0 || applied_log_index < 0 {
return Err(Error::from_reason(
"epoch and appliedLogIndex must be non-negative",
));
}
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.primary_report_replica_progress(&replica_id, epoch as u64, applied_log_index as u64)
.map_err(|e| Error::from_reason(format!("Failed to report replica progress: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn primary_run_retention(&self) -> Result<JsPrimaryRetentionOutcome> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.primary_run_retention()
.map(Into::into)
.map_err(|e| Error::from_reason(format!("Failed to run retention: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn export_replication_snapshot_transport_json(
&self,
include_data: Option<bool>,
) -> Result<String> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.primary_export_snapshot_transport_json(include_data.unwrap_or(false))
.map_err(|e| Error::from_reason(format!("Failed to export replication snapshot: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn export_replication_log_transport_json(
&self,
cursor: Option<String>,
max_frames: Option<i64>,
max_bytes: Option<i64>,
include_payload: Option<bool>,
) -> Result<String> {
let max_frames = max_frames.unwrap_or(128);
let max_bytes = max_bytes.unwrap_or(1_048_576);
if max_frames <= 0 {
return Err(Error::from_reason("maxFrames must be positive"));
}
if max_bytes <= 0 {
return Err(Error::from_reason("maxBytes must be positive"));
}
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.primary_export_log_transport_json(
cursor.as_deref(),
max_frames as usize,
max_bytes as usize,
include_payload.unwrap_or(true),
)
.map_err(|e| Error::from_reason(format!("Failed to export replication log: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn replica_bootstrap_from_snapshot(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.replica_bootstrap_from_snapshot()
.map_err(|e| Error::from_reason(format!("Failed to bootstrap replica: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn replica_catch_up_once(&self, max_frames: i64) -> Result<i64> {
if max_frames < 0 {
return Err(Error::from_reason("maxFrames must be non-negative"));
}
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.replica_catch_up_once(max_frames as usize)
.map(|count| count as i64)
.map_err(|e| Error::from_reason(format!("Failed replica catch-up: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn replica_reseed_from_snapshot(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.replica_reseed_from_snapshot()
.map_err(|e| Error::from_reason(format!("Failed to reseed replica: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn create_node(&self, key: Option<String>) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let node_id = db
.create_node(key.as_deref())
.map_err(|e| Error::from_reason(format!("Failed to create node: {e}")))?;
Ok(node_id as i64)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn create_nodes_batch(&self, keys: Vec<Option<String>>) -> Result<Vec<i64>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let key_refs: Vec<Option<&str>> = keys.iter().map(|k| k.as_deref()).collect();
let node_ids = db
.create_nodes_batch(&key_refs)
.map_err(|e| Error::from_reason(format!("Failed to create nodes: {e}")))?;
Ok(node_ids.into_iter().map(|id| id as i64).collect())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn upsert_node(&self, key: String, props: Vec<JsNodeProp>) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let node_id = match db.node_by_key(&key) {
Some(id) => id,
None => db
.create_node(Some(&key))
.map_err(|e| Error::from_reason(format!("Failed to create node: {e}")))?,
};
for prop in props {
let key_id = prop.key_id as PropKeyId;
if matches!(prop.value.prop_type, PropType::Null) {
db.delete_node_prop(node_id, key_id)
.map_err(|e| Error::from_reason(format!("Failed to delete property: {e}")))?;
} else {
db.set_node_prop(node_id, key_id, prop.value.into())
.map_err(|e| Error::from_reason(format!("Failed to set property: {e}")))?;
}
}
Ok(node_id as i64)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn upsert_node_by_id(&self, node_id: i64, props: Vec<JsNodeProp>) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let node_id_u = node_id as NodeId;
if !db.node_exists(node_id_u) {
db.create_node_with_id(node_id_u, None)
.map_err(|e| Error::from_reason(format!("Failed to create node: {e}")))?;
}
for prop in props {
let key_id = prop.key_id as PropKeyId;
if matches!(prop.value.prop_type, PropType::Null) {
db.delete_node_prop(node_id_u, key_id)
.map_err(|e| Error::from_reason(format!("Failed to delete property: {e}")))?;
} else {
db.set_node_prop(node_id_u, key_id, prop.value.into())
.map_err(|e| Error::from_reason(format!("Failed to set property: {e}")))?;
}
}
Ok(node_id)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn delete_node(&self, node_id: i64) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.delete_node(node_id as NodeId)
.map_err(|e| Error::from_reason(format!("Failed to delete node: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn node_exists(&self, node_id: i64) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.node_exists(node_id as NodeId)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_node_by_key")]
pub fn node_by_key(&self, key: String) -> Result<Option<i64>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.node_by_key(&key).map(|id| id as i64)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_node_key")]
pub fn node_key(&self, node_id: i64) -> Result<Option<String>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.node_key(node_id as NodeId)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn list_nodes(&self) -> Result<Vec<i64>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
Ok(db.list_nodes().into_iter().map(|id| id as i64).collect())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn count_nodes(&self) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.count_nodes() as i64),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn add_edge(&self, src: i64, etype: u32, dst: i64) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.add_edge(src as NodeId, etype as ETypeId, dst as NodeId)
.map_err(|e| Error::from_reason(format!("Failed to add edge: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn add_edges_batch(&self, edges: Vec<JsFullEdge>) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let core_edges: Vec<(NodeId, ETypeId, NodeId)> = edges
.into_iter()
.map(|edge| {
(
edge.src as NodeId,
edge.etype as ETypeId,
edge.dst as NodeId,
)
})
.collect();
db.add_edges_batch(&core_edges)
.map_err(|e| Error::from_reason(format!("Failed to add edges: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn add_edges_with_props_batch(&self, edges: Vec<JsEdgeWithPropsInput>) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let core_edges: Vec<CoreEdgeWithProps> = edges
.into_iter()
.map(|edge| {
let props = edge
.props
.into_iter()
.map(|prop| (prop.key_id as PropKeyId, prop.value.into()))
.collect();
(
edge.src as NodeId,
edge.etype as ETypeId,
edge.dst as NodeId,
props,
)
})
.collect();
db.add_edges_with_props_batch(core_edges)
.map_err(|e| Error::from_reason(format!("Failed to add edges: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn add_edge_by_name(&self, src: i64, etype_name: String, dst: i64) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.add_edge_by_name(src as NodeId, &etype_name, dst as NodeId)
.map_err(|e| Error::from_reason(format!("Failed to add edge: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn upsert_edge(
&self,
src: i64,
etype: u32,
dst: i64,
props: Vec<JsNodeProp>,
) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let updates: Vec<(PropKeyId, Option<PropValue>)> = props
.into_iter()
.map(|prop| {
let value_opt = match prop.value.prop_type {
PropType::Null => None,
_ => Some(prop.value.into()),
};
(prop.key_id as PropKeyId, value_opt)
})
.collect();
db.upsert_edge_with_props(src as NodeId, etype as ETypeId, dst as NodeId, updates)
.map_err(|e| Error::from_reason(format!("Failed to upsert edge: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn delete_edge(&self, src: i64, etype: u32, dst: i64) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.delete_edge(src as NodeId, etype as ETypeId, dst as NodeId)
.map_err(|e| Error::from_reason(format!("Failed to delete edge: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn edge_exists(&self, src: i64, etype: u32, dst: i64) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
Ok(db.edge_exists(src as NodeId, etype as ETypeId, dst as NodeId))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_out_edges")]
pub fn out_edges(&self, node_id: i64) -> Result<Vec<JsEdge>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
db.out_edges(node_id as NodeId)
.into_iter()
.map(|(etype, dst)| JsEdge {
etype,
node_id: dst as i64,
})
.collect(),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_in_edges")]
pub fn in_edges(&self, node_id: i64) -> Result<Vec<JsEdge>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
db.in_edges(node_id as NodeId)
.into_iter()
.map(|(etype, src)| JsEdge {
etype,
node_id: src as i64,
})
.collect(),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_out_degree")]
pub fn out_degree(&self, node_id: i64) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.out_degree(node_id as NodeId) as i64),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_in_degree")]
pub fn in_degree(&self, node_id: i64) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.in_degree(node_id as NodeId) as i64),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn count_edges(&self) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.count_edges() as i64),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn list_edges(&self, etype: Option<u32>) -> Result<Vec<JsFullEdge>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
db.list_edges(etype)
.into_iter()
.map(|e| JsFullEdge {
src: e.src as i64,
etype: e.etype,
dst: e.dst as i64,
})
.collect(),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn list_edges_by_name(&self, etype_name: String) -> Result<Vec<JsFullEdge>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let etype = db
.etype_id(&etype_name)
.ok_or_else(|| Error::from_reason(format!("Unknown edge type: {etype_name}")))?;
Ok(
db.list_edges(Some(etype))
.into_iter()
.map(|e| JsFullEdge {
src: e.src as i64,
etype: e.etype,
dst: e.dst as i64,
})
.collect(),
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn count_edges_by_type(&self, etype: u32) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.count_edges_by_type(etype) as i64),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn count_edges_by_name(&self, etype_name: String) -> Result<i64> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let etype = db
.etype_id(&etype_name)
.ok_or_else(|| Error::from_reason(format!("Unknown edge type: {etype_name}")))?;
Ok(db.count_edges_by_type(etype) as i64)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn stream_nodes(&self, options: Option<StreamOptions>) -> Result<Vec<Vec<i64>>> {
let options = options.unwrap_or_default().into_rust()?;
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
streaming::stream_nodes_single(db, options)
.into_iter()
.map(|batch| batch.into_iter().map(|id| id as i64).collect())
.collect(),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn stream_nodes_with_props(
&self,
options: Option<StreamOptions>,
) -> Result<Vec<Vec<NodeWithProps>>> {
let options = options.unwrap_or_default().into_rust()?;
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let batches = streaming::stream_nodes_single(db, options);
Ok(
batches
.into_iter()
.map(|batch| {
batch
.into_iter()
.map(|node_id| {
let key = db.node_key(node_id as NodeId);
let props = db.node_props(node_id as NodeId).unwrap_or_default();
let props = props
.into_iter()
.map(|(k, v)| JsNodeProp {
key_id: k,
value: v.into(),
})
.collect();
NodeWithProps {
id: node_id as i64,
key,
props,
}
})
.collect()
})
.collect(),
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn stream_edges(&self, options: Option<StreamOptions>) -> Result<Vec<Vec<JsFullEdge>>> {
let options = options.unwrap_or_default().into_rust()?;
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
streaming::stream_edges_single(db, options)
.into_iter()
.map(|batch| {
batch
.into_iter()
.map(|edge| JsFullEdge {
src: edge.src as i64,
etype: edge.etype,
dst: edge.dst as i64,
})
.collect()
})
.collect(),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn stream_edges_with_props(
&self,
options: Option<StreamOptions>,
) -> Result<Vec<Vec<EdgeWithProps>>> {
let options = options.unwrap_or_default().into_rust()?;
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let batches = streaming::stream_edges_single(db, options);
Ok(
batches
.into_iter()
.map(|batch| {
batch
.into_iter()
.map(|edge| {
let props = db
.edge_props(edge.src, edge.etype, edge.dst)
.unwrap_or_default();
let props = props
.into_iter()
.map(|(k, v)| JsNodeProp {
key_id: k,
value: v.into(),
})
.collect();
EdgeWithProps {
src: edge.src as i64,
etype: edge.etype,
dst: edge.dst as i64,
props,
}
})
.collect()
})
.collect(),
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_nodes_page")]
pub fn nodes_page(&self, options: Option<PaginationOptions>) -> Result<NodePage> {
let options = options.unwrap_or_default().into_rust()?;
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let page = streaming::nodes_page_single(db, options);
Ok(NodePage {
items: page.items.into_iter().map(|id| id as i64).collect(),
next_cursor: page.next_cursor,
has_more: page.has_more,
total: Some(db.count_nodes() as i64),
})
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_edges_page")]
pub fn edges_page(&self, options: Option<PaginationOptions>) -> Result<EdgePage> {
let options = options.unwrap_or_default().into_rust()?;
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let page = streaming::edges_page_single(db, options);
Ok(EdgePage {
items: page
.items
.into_iter()
.map(|edge| JsFullEdge {
src: edge.src as i64,
etype: edge.etype,
dst: edge.dst as i64,
})
.collect(),
next_cursor: page.next_cursor,
has_more: page.has_more,
total: Some(db.count_edges() as i64),
})
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn set_node_prop(&self, node_id: i64, key_id: u32, value: JsPropValue) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.set_node_prop(node_id as NodeId, key_id as PropKeyId, value.into())
.map_err(|e| Error::from_reason(format!("Failed to set property: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn set_node_prop_by_name(
&self,
node_id: i64,
key_name: String,
value: JsPropValue,
) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.set_node_prop_by_name(node_id as NodeId, &key_name, value.into())
.map_err(|e| Error::from_reason(format!("Failed to set property: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn delete_node_prop(&self, node_id: i64, key_id: u32) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.delete_node_prop(node_id as NodeId, key_id as PropKeyId)
.map_err(|e| Error::from_reason(format!("Failed to delete property: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_node_prop")]
pub fn node_prop(&self, node_id: i64, key_id: u32) -> Result<Option<JsPropValue>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
db.node_prop(node_id as NodeId, key_id as PropKeyId)
.map(|v| v.into()),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_node_props")]
pub fn node_props(&self, node_id: i64) -> Result<Option<Vec<JsNodeProp>>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.node_props(node_id as NodeId).map(|props| {
props
.into_iter()
.map(|(k, v)| JsNodeProp {
key_id: k,
value: v.into(),
})
.collect()
})),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn set_edge_prop(
&self,
src: i64,
etype: u32,
dst: i64,
key_id: u32,
value: JsPropValue,
) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.set_edge_prop(
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId,
value.into(),
)
.map_err(|e| Error::from_reason(format!("Failed to set edge property: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn set_edge_prop_by_name(
&self,
src: i64,
etype: u32,
dst: i64,
key_name: String,
value: JsPropValue,
) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.set_edge_prop_by_name(
src as NodeId,
etype as ETypeId,
dst as NodeId,
&key_name,
value.into(),
)
.map_err(|e| Error::from_reason(format!("Failed to set edge property: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn delete_edge_prop(&self, src: i64, etype: u32, dst: i64, key_id: u32) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.delete_edge_prop(
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId,
)
.map_err(|e| Error::from_reason(format!("Failed to delete edge property: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_edge_prop")]
pub fn edge_prop(
&self,
src: i64,
etype: u32,
dst: i64,
key_id: u32,
) -> Result<Option<JsPropValue>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
db.edge_prop(
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId,
)
.map(|v| v.into()),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_edge_props")]
pub fn edge_props(&self, src: i64, etype: u32, dst: i64) -> Result<Option<Vec<JsNodeProp>>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
db.edge_props(src as NodeId, etype as ETypeId, dst as NodeId)
.map(|props| {
props
.into_iter()
.map(|(k, v)| JsNodeProp {
key_id: k,
value: v.into(),
})
.collect()
}),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn set_node_vector(&self, node_id: i64, prop_key_id: u32, vector: Vec<f64>) -> Result<()> {
let vector_f32: Vec<f32> = vector.iter().map(|&v| v as f32).collect();
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.set_node_vector(node_id as NodeId, prop_key_id as PropKeyId, &vector_f32)
.map_err(|e| Error::from_reason(format!("Failed to set vector: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_node_vector")]
pub fn node_vector(&self, node_id: i64, prop_key_id: u32) -> Result<Option<Vec<f64>>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
db.node_vector(node_id as NodeId, prop_key_id as PropKeyId)
.map(|v| v.iter().map(|&f| f as f64).collect()),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn delete_node_vector(&self, node_id: i64, prop_key_id: u32) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.delete_node_vector(node_id as NodeId, prop_key_id as PropKeyId)
.map_err(|e| Error::from_reason(format!("Failed to delete vector: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn has_node_vector(&self, node_id: i64, prop_key_id: u32) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
Ok(db.has_node_vector(node_id as NodeId, prop_key_id as PropKeyId))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_or_create_label")]
pub fn ensure_label(&self, name: String) -> Result<u32> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.label_id_or_create(&name)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_label_id")]
pub fn label_id(&self, name: String) -> Result<Option<u32>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.label_id(&name)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_label_name")]
pub fn label_name(&self, id: u32) -> Result<Option<String>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.label_name(id)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_or_create_etype")]
pub fn ensure_etype(&self, name: String) -> Result<u32> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.etype_id_or_create(&name)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_etype_id")]
pub fn etype_id(&self, name: String) -> Result<Option<u32>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.etype_id(&name)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_etype_name")]
pub fn etype_name(&self, id: u32) -> Result<Option<String>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.etype_name(id)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_or_create_propkey")]
pub fn ensure_propkey(&self, name: String) -> Result<u32> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.propkey_id_or_create(&name)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_propkey_id")]
pub fn propkey_id(&self, name: String) -> Result<Option<u32>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.propkey_id(&name)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_propkey_name")]
pub fn propkey_name(&self, id: u32) -> Result<Option<String>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.propkey_name(id)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn define_label(&self, name: String) -> Result<u32> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.define_label(&name)
.map_err(|e| Error::from_reason(format!("Failed to define label: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn add_node_label(&self, node_id: i64, label_id: u32) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.add_node_label(node_id as NodeId, label_id)
.map_err(|e| Error::from_reason(format!("Failed to add label: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn add_node_label_by_name(&self, node_id: i64, label_name: String) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.add_node_label_by_name(node_id as NodeId, &label_name)
.map_err(|e| Error::from_reason(format!("Failed to add label: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn remove_node_label(&self, node_id: i64, label_id: u32) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.remove_node_label(node_id as NodeId, label_id)
.map_err(|e| Error::from_reason(format!("Failed to remove label: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn node_has_label(&self, node_id: i64, label_id: u32) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.node_has_label(node_id as NodeId, label_id)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "get_node_labels")]
pub fn node_labels(&self, node_id: i64) -> Result<Vec<u32>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.node_labels(node_id as NodeId)),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn traverse_single(
&self,
start_nodes: Vec<i64>,
direction: JsTraversalDirection,
edge_type: Option<u32>,
) -> Result<Vec<JsTraversalResult>> {
let start: Vec<NodeId> = start_nodes.iter().map(|&id| id as NodeId).collect();
let etype = edge_type;
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let builder = match direction {
JsTraversalDirection::Out => RustTraversalBuilder::new(start).out(etype),
JsTraversalDirection::In => RustTraversalBuilder::new(start).r#in(etype),
JsTraversalDirection::Both => RustTraversalBuilder::new(start).both(etype),
};
Ok(
builder
.execute(|node_id, dir, etype| neighbors_from_single_file(db, node_id, dir, etype))
.map(JsTraversalResult::from)
.collect(),
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn traverse(
&self,
start_nodes: Vec<i64>,
steps: Vec<JsTraversalStep>,
limit: Option<u32>,
) -> Result<Vec<JsTraversalResult>> {
let start: Vec<NodeId> = start_nodes.iter().map(|&id| id as NodeId).collect();
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let mut builder = RustTraversalBuilder::new(start);
for step in steps {
let etype = step.edge_type;
builder = match step.direction {
JsTraversalDirection::Out => builder.out(etype),
JsTraversalDirection::In => builder.r#in(etype),
JsTraversalDirection::Both => builder.both(etype),
};
}
if let Some(n) = limit {
builder = builder.take(n as usize);
}
Ok(
builder
.execute(|node_id, dir, etype| neighbors_from_single_file(db, node_id, dir, etype))
.map(JsTraversalResult::from)
.collect(),
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn traverse_depth(
&self,
start_nodes: Vec<i64>,
edge_type: Option<u32>,
options: JsTraverseOptions,
) -> Result<Vec<JsTraversalResult>> {
let start: Vec<NodeId> = start_nodes.iter().map(|&id| id as NodeId).collect();
let opts: TraverseOptions = options.into();
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
RustTraversalBuilder::new(start)
.traverse(edge_type, opts)
.execute(|node_id, dir, etype| neighbors_from_single_file(db, node_id, dir, etype))
.map(JsTraversalResult::from)
.collect(),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn traverse_count(&self, start_nodes: Vec<i64>, steps: Vec<JsTraversalStep>) -> Result<u32> {
let start: Vec<NodeId> = start_nodes.iter().map(|&id| id as NodeId).collect();
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let mut builder = RustTraversalBuilder::new(start);
for step in steps {
let etype = step.edge_type;
builder = match step.direction {
JsTraversalDirection::Out => builder.out(etype),
JsTraversalDirection::In => builder.r#in(etype),
JsTraversalDirection::Both => builder.both(etype),
};
}
Ok(
builder.count(|node_id, dir, etype| neighbors_from_single_file(db, node_id, dir, etype))
as u32,
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn traverse_node_ids(
&self,
start_nodes: Vec<i64>,
steps: Vec<JsTraversalStep>,
limit: Option<u32>,
) -> Result<Vec<i64>> {
let start: Vec<NodeId> = start_nodes.iter().map(|&id| id as NodeId).collect();
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let mut builder = RustTraversalBuilder::new(start);
for step in steps {
let etype = step.edge_type;
builder = match step.direction {
JsTraversalDirection::Out => builder.out(etype),
JsTraversalDirection::In => builder.r#in(etype),
JsTraversalDirection::Both => builder.both(etype),
};
}
if let Some(n) = limit {
builder = builder.take(n as usize);
}
Ok(
builder
.collect_node_ids(|node_id, dir, etype| {
neighbors_from_single_file(db, node_id, dir, etype)
})
.into_iter()
.map(|id| id as i64)
.collect(),
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn dijkstra(&self, config: JsPathConfig) -> Result<JsPathResult> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let weight_key = resolve_weight_key_single_file(db, &config)?;
let rust_config: PathConfig = config.into();
Ok(
dijkstra(
rust_config,
|node_id, dir, etype| neighbors_from_single_file(db, node_id, dir, etype),
|src, etype, dst| edge_weight_from_single_file(db, src, etype, dst, weight_key),
)
.into(),
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn bfs(&self, config: JsPathConfig) -> Result<JsPathResult> {
let rust_config: PathConfig = config.into();
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(
bfs(rust_config, |node_id, dir, etype| {
neighbors_from_single_file(db, node_id, dir, etype)
})
.into(),
),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn k_shortest(&self, config: JsPathConfig, k: u32) -> Result<Vec<JsPathResult>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let weight_key = resolve_weight_key_single_file(db, &config)?;
let rust_config: PathConfig = config.into();
Ok(
yen_k_shortest(
rust_config,
k as usize,
|node_id, dir, etype| neighbors_from_single_file(db, node_id, dir, etype),
|src, etype, dst| edge_weight_from_single_file(db, src, etype, dst, weight_key),
)
.into_iter()
.map(JsPathResult::from)
.collect(),
)
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn shortest_path(
&self,
source: i64,
target: i64,
edge_type: Option<u32>,
max_depth: Option<u32>,
) -> Result<JsPathResult> {
let config = JsPathConfig {
source,
target: Some(target),
targets: None,
allowed_edge_types: edge_type.map(|e| vec![e]),
weight_key_id: None,
weight_key_name: None,
direction: Some(JsTraversalDirection::Out),
max_depth,
};
self.dijkstra(config)
}
#[napi]
pub fn has_path(
&self,
source: i64,
target: i64,
edge_type: Option<u32>,
max_depth: Option<u32>,
) -> Result<bool> {
Ok(
self
.shortest_path(source, target, edge_type, max_depth)?
.found,
)
}
#[napi]
pub fn reachable_nodes(
&self,
source: i64,
max_depth: u32,
edge_type: Option<u32>,
) -> Result<Vec<i64>> {
let opts = JsTraverseOptions {
direction: Some(JsTraversalDirection::Out),
min_depth: Some(1),
max_depth,
unique: Some(true),
};
Ok(
self
.traverse_depth(vec![source], edge_type, opts)?
.into_iter()
.map(|r| r.node_id)
.collect(),
)
}
#[napi]
pub fn checkpoint(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.checkpoint()
.map_err(|e| Error::from_reason(format!("Failed to checkpoint: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn background_checkpoint(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.background_checkpoint()
.map_err(|e| Error::from_reason(format!("Failed to background checkpoint: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn should_checkpoint(&self, threshold: Option<f64>) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.should_checkpoint(threshold.unwrap_or(0.8))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn optimize(&mut self) -> Result<()> {
match self.inner.as_mut() {
Some(DatabaseInner::SingleFile(db)) => db
.optimize_single_file(None)
.map_err(|e| Error::from_reason(format!("Failed to optimize: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "optimizeSingleFile")]
pub fn optimize_single_file(&mut self, options: Option<SingleFileOptimizeOptions>) -> Result<()> {
match self.inner.as_mut() {
Some(DatabaseInner::SingleFile(db)) => db
.optimize_single_file(options.map(Into::into))
.map_err(|e| Error::from_reason(format!("Failed to optimize single-file: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn vacuum(&mut self, options: Option<VacuumOptions>) -> Result<()> {
match self.inner.as_mut() {
Some(DatabaseInner::SingleFile(db)) => db
.vacuum_single_file(options.map(Into::into))
.map_err(|e| Error::from_reason(format!("Failed to vacuum: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(js_name = "vacuumSingleFile")]
pub fn vacuum_single_file(&mut self, options: Option<VacuumOptions>) -> Result<()> {
self.vacuum(options)
}
#[napi(js_name = "resizeWal")]
pub fn resize_wal(&mut self, size_bytes: i64, options: Option<ResizeWalOptions>) -> Result<()> {
if size_bytes <= 0 {
return Err(Error::from_reason("sizeBytes must be greater than 0"));
}
match self.inner.as_mut() {
Some(DatabaseInner::SingleFile(db)) => db
.resize_wal(size_bytes as usize, options.map(Into::into))
.map_err(|e| Error::from_reason(format!("Failed to resize WAL: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn stats(&self) -> Result<DbStats> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let s = db.stats();
Ok(DbStats {
snapshot_gen: s.snapshot_gen as i64,
snapshot_nodes: s.snapshot_nodes as i64,
snapshot_edges: s.snapshot_edges as i64,
snapshot_max_node_id: s.snapshot_max_node_id as i64,
delta_nodes_created: s.delta_nodes_created as i64,
delta_nodes_deleted: s.delta_nodes_deleted as i64,
delta_edges_added: s.delta_edges_added as i64,
delta_edges_deleted: s.delta_edges_deleted as i64,
wal_segment: s.wal_segment as i64,
wal_bytes: s.wal_bytes as i64,
recommend_compact: s.recommend_compact,
mvcc_stats: s.mvcc_stats.map(|stats| MvccStats {
active_transactions: stats.active_transactions as i64,
min_active_ts: stats.min_active_ts as i64,
versions_pruned: stats.versions_pruned as i64,
gc_runs: stats.gc_runs as i64,
last_gc_time: stats.last_gc_time as i64,
committed_writes_size: stats.committed_writes_size as i64,
committed_writes_pruned: stats.committed_writes_pruned as i64,
}),
})
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn check(&self) -> Result<CheckResult> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(CheckResult::from(db.check())),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn export_to_object(&self, options: Option<ExportOptions>) -> Result<serde_json::Value> {
let opts = options.unwrap_or(ExportOptions {
include_nodes: None,
include_edges: None,
include_schema: None,
pretty: None,
});
let opts = opts.into_rust();
let data = match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => ray_export::export_to_object_single(db, opts)
.map_err(|e| Error::from_reason(e.to_string()))?,
None => return Err(Error::from_reason("Database is closed")),
};
serde_json::to_value(data).map_err(|e| Error::from_reason(e.to_string()))
}
#[napi]
pub fn export_to_json(
&self,
path: String,
options: Option<ExportOptions>,
) -> Result<ExportResult> {
let opts = options.unwrap_or(ExportOptions {
include_nodes: None,
include_edges: None,
include_schema: None,
pretty: None,
});
let rust_opts = opts.into_rust();
let data = match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
ray_export::export_to_object_single(db, rust_opts.clone())
.map_err(|e| Error::from_reason(e.to_string()))?
}
None => return Err(Error::from_reason("Database is closed")),
};
let result = ray_export::export_to_json(&data, path, rust_opts.pretty)
.map_err(|e| Error::from_reason(e.to_string()))?;
Ok(ExportResult {
node_count: result.node_count as i64,
edge_count: result.edge_count as i64,
})
}
#[napi]
pub fn export_to_jsonl(
&self,
path: String,
options: Option<ExportOptions>,
) -> Result<ExportResult> {
let opts = options.unwrap_or(ExportOptions {
include_nodes: None,
include_edges: None,
include_schema: None,
pretty: None,
});
let rust_opts = opts.into_rust();
let data = match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => ray_export::export_to_object_single(db, rust_opts)
.map_err(|e| Error::from_reason(e.to_string()))?,
None => return Err(Error::from_reason("Database is closed")),
};
let result =
ray_export::export_to_jsonl(&data, path).map_err(|e| Error::from_reason(e.to_string()))?;
Ok(ExportResult {
node_count: result.node_count as i64,
edge_count: result.edge_count as i64,
})
}
#[napi]
pub fn import_from_object(
&self,
data: serde_json::Value,
options: Option<ImportOptions>,
) -> Result<ImportResult> {
let opts = options.unwrap_or(ImportOptions {
skip_existing: None,
batch_size: None,
});
let rust_opts = opts.into_rust();
let parsed: ray_export::ExportedDatabase =
serde_json::from_value(data).map_err(|e| Error::from_reason(e.to_string()))?;
let result = match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
ray_export::import_from_object_single(db, &parsed, rust_opts)
.map_err(|e| Error::from_reason(e.to_string()))?
}
None => return Err(Error::from_reason("Database is closed")),
};
Ok(ImportResult {
node_count: result.node_count as i64,
edge_count: result.edge_count as i64,
skipped: result.skipped as i64,
})
}
#[napi]
pub fn import_from_json(
&self,
path: String,
options: Option<ImportOptions>,
) -> Result<ImportResult> {
let opts = options.unwrap_or(ImportOptions {
skip_existing: None,
batch_size: None,
});
let rust_opts = opts.into_rust();
let parsed =
ray_export::import_from_json(path).map_err(|e| Error::from_reason(e.to_string()))?;
let result = match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
ray_export::import_from_object_single(db, &parsed, rust_opts)
.map_err(|e| Error::from_reason(e.to_string()))?
}
None => return Err(Error::from_reason("Database is closed")),
};
Ok(ImportResult {
node_count: result.node_count as i64,
edge_count: result.edge_count as i64,
skipped: result.skipped as i64,
})
}
#[napi]
pub fn cache_is_enabled(&self) -> Result<bool> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.cache_is_enabled()),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_invalidate_node(&self, node_id: i64) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_invalidate_node(node_id as NodeId);
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_invalidate_edge(&self, src: i64, etype: u32, dst: i64) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_invalidate_edge(src as NodeId, etype as ETypeId, dst as NodeId);
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_invalidate_key(&self, key: String) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_invalidate_key(&key);
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_clear(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_clear();
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_clear_query(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_clear_query();
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_clear_key(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_clear_key();
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_clear_property(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_clear_property();
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_clear_traversal(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_clear_traversal();
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_stats(&self) -> Result<Option<JsCacheStats>> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db.cache_stats().map(|s| JsCacheStats {
property_cache_hits: s.property_cache_hits as i64,
property_cache_misses: s.property_cache_misses as i64,
property_cache_size: s.property_cache_size as i64,
traversal_cache_hits: s.traversal_cache_hits as i64,
traversal_cache_misses: s.traversal_cache_misses as i64,
traversal_cache_size: s.traversal_cache_size as i64,
query_cache_hits: s.query_cache_hits as i64,
query_cache_misses: s.query_cache_misses as i64,
query_cache_size: s.query_cache_size as i64,
})),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn cache_reset_stats(&self) -> Result<()> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
db.cache_reset_stats();
Ok(())
}
None => Err(Error::from_reason("Database is closed")),
}
}
fn db(&self) -> Result<&RustSingleFileDB> {
match self.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(db),
None => Err(Error::from_reason("Database is closed")),
}
}
}
fn neighbors_from_single_file(
db: &RustSingleFileDB,
node_id: NodeId,
direction: TraversalDirection,
etype: Option<ETypeId>,
) -> Vec<Edge> {
let mut edges = Vec::new();
match direction {
TraversalDirection::Out => {
for (e, dst) in db.out_edges(node_id) {
if etype.is_none() || etype == Some(e) {
edges.push(Edge {
src: node_id,
etype: e,
dst,
});
}
}
}
TraversalDirection::In => {
for (e, src) in db.in_edges(node_id) {
if etype.is_none() || etype == Some(e) {
edges.push(Edge {
src,
etype: e,
dst: node_id,
});
}
}
}
TraversalDirection::Both => {
edges.extend(neighbors_from_single_file(
db,
node_id,
TraversalDirection::Out,
etype,
));
edges.extend(neighbors_from_single_file(
db,
node_id,
TraversalDirection::In,
etype,
));
}
}
edges
}
fn resolve_weight_key_single_file(
db: &RustSingleFileDB,
config: &JsPathConfig,
) -> Result<Option<PropKeyId>> {
if let Some(key_id) = config.weight_key_id {
return Ok(Some(key_id as PropKeyId));
}
if let Some(ref key_name) = config.weight_key_name {
let key_id = db
.propkey_id(key_name)
.ok_or_else(|| Error::from_reason(format!("Unknown property key: {key_name}")))?;
return Ok(Some(key_id));
}
Ok(None)
}
fn prop_value_to_weight(value: Option<PropValue>) -> f64 {
let weight = match value {
Some(PropValue::Bool(v)) => {
if v {
1.0
} else {
0.0
}
}
Some(PropValue::I64(v)) => v as f64,
Some(PropValue::F64(v)) => v,
Some(PropValue::String(v)) => v.parse::<f64>().unwrap_or(1.0),
Some(PropValue::VectorF32(_)) => 1.0,
Some(PropValue::Null) | None => 1.0,
};
if weight.is_finite() && weight > 0.0 {
weight
} else {
1.0
}
}
fn edge_weight_from_single_file(
db: &RustSingleFileDB,
src: NodeId,
etype: ETypeId,
dst: NodeId,
weight_key: Option<PropKeyId>,
) -> f64 {
match weight_key {
Some(key_id) => prop_value_to_weight(db.edge_prop(src, etype, dst, key_id)),
None => 1.0,
}
}
#[napi]
pub fn open_database(path: String, options: Option<OpenOptions>) -> Result<Database> {
Database::open(path, options)
}
#[napi]
pub fn recommended_safe_profile() -> RuntimeProfile {
runtime_profile_from_rust(RustKiteRuntimeProfile::safe())
}
#[napi]
pub fn recommended_balanced_profile() -> RuntimeProfile {
runtime_profile_from_rust(RustKiteRuntimeProfile::balanced())
}
#[napi]
pub fn recommended_reopen_heavy_profile() -> RuntimeProfile {
runtime_profile_from_rust(RustKiteRuntimeProfile::reopen_heavy())
}
#[napi]
pub fn collect_metrics(db: &Database) -> Result<DatabaseMetrics> {
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(core_metrics::collect_metrics_single_file(db).into()),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn collect_replication_metrics_prometheus(db: &Database) -> Result<String> {
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
Ok(core_metrics::collect_replication_metrics_prometheus_single_file(db))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn collect_replication_metrics_otel_json(db: &Database) -> Result<String> {
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
Ok(core_metrics::collect_replication_metrics_otel_json_single_file(db))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn collect_replication_metrics_otel_protobuf(db: &Database) -> Result<Buffer> {
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
Ok(core_metrics::collect_replication_metrics_otel_protobuf_single_file(db).into())
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn collect_replication_snapshot_transport_json(
db: &Database,
include_data: Option<bool>,
) -> Result<String> {
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.primary_export_snapshot_transport_json(include_data.unwrap_or(false))
.map_err(|e| Error::from_reason(format!("Failed to export replication snapshot: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn collect_replication_log_transport_json(
db: &Database,
cursor: Option<String>,
max_frames: Option<i64>,
max_bytes: Option<i64>,
include_payload: Option<bool>,
) -> Result<String> {
let max_frames = max_frames.unwrap_or(128);
let max_bytes = max_bytes.unwrap_or(1_048_576);
if max_frames <= 0 {
return Err(Error::from_reason("maxFrames must be positive"));
}
if max_bytes <= 0 {
return Err(Error::from_reason("maxBytes must be positive"));
}
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => db
.primary_export_log_transport_json(
cursor.as_deref(),
max_frames as usize,
max_bytes as usize,
include_payload.unwrap_or(true),
)
.map_err(|e| Error::from_reason(format!("Failed to export replication log: {e}"))),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn push_replication_metrics_otel_json(
db: &Database,
endpoint: String,
timeout_ms: i64,
bearer_token: Option<String>,
) -> Result<OtlpHttpExportResult> {
if timeout_ms <= 0 {
return Err(Error::from_reason("timeoutMs must be positive"));
}
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
core_metrics::push_replication_metrics_otel_json_single_file(
db,
&endpoint,
timeout_ms as u64,
bearer_token.as_deref(),
)
.map(Into::into)
.map_err(|e| Error::from_reason(format!("Failed to push replication metrics: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
fn build_core_otel_push_options(
options: PushReplicationMetricsOtelOptions,
) -> Result<core_metrics::OtlpHttpPushOptions> {
let timeout_ms = options.timeout_ms.unwrap_or(5_000);
if timeout_ms <= 0 {
return Err(Error::from_reason("timeoutMs must be positive"));
}
let retry_max_attempts = options.retry_max_attempts.unwrap_or(1);
if retry_max_attempts <= 0 {
return Err(Error::from_reason("retryMaxAttempts must be positive"));
}
let retry_backoff_ms = options.retry_backoff_ms.unwrap_or(100);
if retry_backoff_ms < 0 {
return Err(Error::from_reason("retryBackoffMs must be non-negative"));
}
let retry_backoff_max_ms = options.retry_backoff_max_ms.unwrap_or(2_000);
if retry_backoff_max_ms < 0 {
return Err(Error::from_reason("retryBackoffMaxMs must be non-negative"));
}
if retry_backoff_max_ms > 0 && retry_backoff_max_ms < retry_backoff_ms {
return Err(Error::from_reason(
"retryBackoffMaxMs must be >= retryBackoffMs when non-zero",
));
}
let retry_jitter_ratio = options.retry_jitter_ratio.unwrap_or(0.0);
if !(0.0..=1.0).contains(&retry_jitter_ratio) {
return Err(Error::from_reason(
"retryJitterRatio must be within [0.0, 1.0]",
));
}
let adaptive_retry_mode = match options
.adaptive_retry_mode
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
None => core_metrics::OtlpAdaptiveRetryMode::Linear,
Some(value) if value.eq_ignore_ascii_case("linear") => {
core_metrics::OtlpAdaptiveRetryMode::Linear
}
Some(value) if value.eq_ignore_ascii_case("ewma") => core_metrics::OtlpAdaptiveRetryMode::Ewma,
Some(_) => {
return Err(Error::from_reason(
"adaptiveRetryMode must be one of: linear, ewma",
));
}
};
let adaptive_retry_ewma_alpha = options.adaptive_retry_ewma_alpha.unwrap_or(0.3);
if !(0.0..=1.0).contains(&adaptive_retry_ewma_alpha) {
return Err(Error::from_reason(
"adaptiveRetryEwmaAlpha must be within [0.0, 1.0]",
));
}
let circuit_breaker_failure_threshold = options.circuit_breaker_failure_threshold.unwrap_or(0);
if circuit_breaker_failure_threshold < 0 {
return Err(Error::from_reason(
"circuitBreakerFailureThreshold must be non-negative",
));
}
let circuit_breaker_open_ms = options.circuit_breaker_open_ms.unwrap_or(0);
if circuit_breaker_open_ms < 0 {
return Err(Error::from_reason(
"circuitBreakerOpenMs must be non-negative",
));
}
if circuit_breaker_failure_threshold > 0 && circuit_breaker_open_ms == 0 {
return Err(Error::from_reason(
"circuitBreakerOpenMs must be positive when circuitBreakerFailureThreshold is set",
));
}
let circuit_breaker_half_open_probes = options.circuit_breaker_half_open_probes.unwrap_or(1);
if circuit_breaker_half_open_probes < 0 {
return Err(Error::from_reason(
"circuitBreakerHalfOpenProbes must be non-negative",
));
}
if circuit_breaker_failure_threshold > 0 && circuit_breaker_half_open_probes == 0 {
return Err(Error::from_reason(
"circuitBreakerHalfOpenProbes must be positive when circuitBreakerFailureThreshold is set",
));
}
if let Some(path) = options.circuit_breaker_state_path.as_deref() {
if path.trim().is_empty() {
return Err(Error::from_reason(
"circuitBreakerStatePath must not be empty when provided",
));
}
}
if let Some(url) = options.circuit_breaker_state_url.as_deref() {
let trimmed = url.trim();
if trimmed.is_empty() {
return Err(Error::from_reason(
"circuitBreakerStateUrl must not be empty when provided",
));
}
if !(trimmed.starts_with("http://") || trimmed.starts_with("https://")) {
return Err(Error::from_reason(
"circuitBreakerStateUrl must use http:// or https://",
));
}
if options.https_only.unwrap_or(false) && trimmed.starts_with("http://") {
return Err(Error::from_reason(
"circuitBreakerStateUrl must use https when httpsOnly is enabled",
));
}
}
if options.circuit_breaker_state_path.is_some() && options.circuit_breaker_state_url.is_some() {
return Err(Error::from_reason(
"circuitBreakerStatePath and circuitBreakerStateUrl are mutually exclusive",
));
}
if options.circuit_breaker_state_patch.unwrap_or(false)
&& options.circuit_breaker_state_url.is_none()
{
return Err(Error::from_reason(
"circuitBreakerStatePatch requires circuitBreakerStateUrl",
));
}
if options.circuit_breaker_state_patch_batch.unwrap_or(false)
&& !options.circuit_breaker_state_patch.unwrap_or(false)
{
return Err(Error::from_reason(
"circuitBreakerStatePatchBatch requires circuitBreakerStatePatch",
));
}
if options.circuit_breaker_state_patch_merge.unwrap_or(false)
&& !options.circuit_breaker_state_patch.unwrap_or(false)
{
return Err(Error::from_reason(
"circuitBreakerStatePatchMerge requires circuitBreakerStatePatch",
));
}
let circuit_breaker_state_patch_batch_max_keys = options
.circuit_breaker_state_patch_batch_max_keys
.unwrap_or(8);
if circuit_breaker_state_patch_batch_max_keys <= 0 {
return Err(Error::from_reason(
"circuitBreakerStatePatchBatchMaxKeys must be positive",
));
}
let circuit_breaker_state_patch_merge_max_keys = options
.circuit_breaker_state_patch_merge_max_keys
.unwrap_or(32);
if circuit_breaker_state_patch_merge_max_keys <= 0 {
return Err(Error::from_reason(
"circuitBreakerStatePatchMergeMaxKeys must be positive",
));
}
let circuit_breaker_state_patch_retry_max_attempts = options
.circuit_breaker_state_patch_retry_max_attempts
.unwrap_or(1);
if circuit_breaker_state_patch_retry_max_attempts <= 0 {
return Err(Error::from_reason(
"circuitBreakerStatePatchRetryMaxAttempts must be positive",
));
}
if options.circuit_breaker_state_cas.unwrap_or(false)
&& options.circuit_breaker_state_url.is_none()
{
return Err(Error::from_reason(
"circuitBreakerStateCas requires circuitBreakerStateUrl",
));
}
if let Some(lease_id) = options.circuit_breaker_state_lease_id.as_deref() {
if lease_id.trim().is_empty() {
return Err(Error::from_reason(
"circuitBreakerStateLeaseId must not be empty when provided",
));
}
if options.circuit_breaker_state_url.is_none() {
return Err(Error::from_reason(
"circuitBreakerStateLeaseId requires circuitBreakerStateUrl",
));
}
}
if let Some(scope_key) = options.circuit_breaker_scope_key.as_deref() {
if scope_key.trim().is_empty() {
return Err(Error::from_reason(
"circuitBreakerScopeKey must not be empty when provided",
));
}
}
Ok(core_metrics::OtlpHttpPushOptions {
timeout_ms: timeout_ms as u64,
bearer_token: options.bearer_token,
retry_max_attempts: retry_max_attempts as u32,
retry_backoff_ms: retry_backoff_ms as u64,
retry_backoff_max_ms: retry_backoff_max_ms as u64,
retry_jitter_ratio,
adaptive_retry_mode,
adaptive_retry_ewma_alpha,
adaptive_retry: options.adaptive_retry.unwrap_or(false),
circuit_breaker_failure_threshold: circuit_breaker_failure_threshold as u32,
circuit_breaker_open_ms: circuit_breaker_open_ms as u64,
circuit_breaker_half_open_probes: circuit_breaker_half_open_probes as u32,
circuit_breaker_state_path: options.circuit_breaker_state_path,
circuit_breaker_state_url: options.circuit_breaker_state_url,
circuit_breaker_state_patch: options.circuit_breaker_state_patch.unwrap_or(false),
circuit_breaker_state_patch_batch: options.circuit_breaker_state_patch_batch.unwrap_or(false),
circuit_breaker_state_patch_batch_max_keys: circuit_breaker_state_patch_batch_max_keys as u32,
circuit_breaker_state_patch_merge: options.circuit_breaker_state_patch_merge.unwrap_or(false),
circuit_breaker_state_patch_merge_max_keys: circuit_breaker_state_patch_merge_max_keys as u32,
circuit_breaker_state_patch_retry_max_attempts: circuit_breaker_state_patch_retry_max_attempts
as u32,
circuit_breaker_state_cas: options.circuit_breaker_state_cas.unwrap_or(false),
circuit_breaker_state_lease_id: options.circuit_breaker_state_lease_id,
circuit_breaker_scope_key: options.circuit_breaker_scope_key,
compression_gzip: options.compression_gzip.unwrap_or(false),
tls: core_metrics::OtlpHttpTlsOptions {
https_only: options.https_only.unwrap_or(false),
ca_cert_pem_path: options.ca_cert_pem_path,
client_cert_pem_path: options.client_cert_pem_path,
client_key_pem_path: options.client_key_pem_path,
},
})
}
#[napi]
pub fn push_replication_metrics_otel_json_with_options(
db: &Database,
endpoint: String,
options: Option<PushReplicationMetricsOtelOptions>,
) -> Result<OtlpHttpExportResult> {
let core_options = build_core_otel_push_options(options.unwrap_or_default())?;
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
core_metrics::push_replication_metrics_otel_json_single_file_with_options(
db,
&endpoint,
&core_options,
)
.map(Into::into)
.map_err(|e| Error::from_reason(format!("Failed to push replication metrics: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn push_replication_metrics_otel_protobuf(
db: &Database,
endpoint: String,
timeout_ms: i64,
bearer_token: Option<String>,
) -> Result<OtlpHttpExportResult> {
if timeout_ms <= 0 {
return Err(Error::from_reason("timeoutMs must be positive"));
}
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
core_metrics::push_replication_metrics_otel_protobuf_single_file(
db,
&endpoint,
timeout_ms as u64,
bearer_token.as_deref(),
)
.map(Into::into)
.map_err(|e| Error::from_reason(format!("Failed to push replication metrics: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn push_replication_metrics_otel_protobuf_with_options(
db: &Database,
endpoint: String,
options: Option<PushReplicationMetricsOtelOptions>,
) -> Result<OtlpHttpExportResult> {
let core_options = build_core_otel_push_options(options.unwrap_or_default())?;
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
core_metrics::push_replication_metrics_otel_protobuf_single_file_with_options(
db,
&endpoint,
&core_options,
)
.map(Into::into)
.map_err(|e| Error::from_reason(format!("Failed to push replication metrics: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn push_replication_metrics_otel_grpc(
db: &Database,
endpoint: String,
timeout_ms: i64,
bearer_token: Option<String>,
) -> Result<OtlpHttpExportResult> {
if timeout_ms <= 0 {
return Err(Error::from_reason("timeoutMs must be positive"));
}
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
core_metrics::push_replication_metrics_otel_grpc_single_file(
db,
&endpoint,
timeout_ms as u64,
bearer_token.as_deref(),
)
.map(Into::into)
.map_err(|e| Error::from_reason(format!("Failed to push replication metrics: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn push_replication_metrics_otel_grpc_with_options(
db: &Database,
endpoint: String,
options: Option<PushReplicationMetricsOtelOptions>,
) -> Result<OtlpHttpExportResult> {
let core_options = build_core_otel_push_options(options.unwrap_or_default())?;
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
core_metrics::push_replication_metrics_otel_grpc_single_file_with_options(
db,
&endpoint,
&core_options,
)
.map(Into::into)
.map_err(|e| Error::from_reason(format!("Failed to push replication metrics: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn health_check(db: &Database) -> Result<HealthCheckResult> {
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => Ok(core_metrics::health_check_single_file(db).into()),
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi(object)]
#[derive(Default, Clone)]
pub struct BackupOptions {
pub checkpoint: Option<bool>,
pub overwrite: Option<bool>,
}
#[napi(object)]
#[derive(Default, Clone)]
pub struct RestoreOptions {
pub overwrite: Option<bool>,
}
#[napi(object)]
#[derive(Default, Clone)]
pub struct OfflineBackupOptions {
pub overwrite: Option<bool>,
}
#[napi(object)]
pub struct BackupResult {
pub path: String,
pub size: i64,
pub timestamp: i64,
pub r#type: String,
}
impl From<BackupOptions> for core_backup::BackupOptions {
fn from(options: BackupOptions) -> Self {
Self {
checkpoint: options.checkpoint.unwrap_or(true),
overwrite: options.overwrite.unwrap_or(false),
}
}
}
impl From<RestoreOptions> for core_backup::RestoreOptions {
fn from(options: RestoreOptions) -> Self {
Self {
overwrite: options.overwrite.unwrap_or(false),
}
}
}
impl From<OfflineBackupOptions> for core_backup::OfflineBackupOptions {
fn from(options: OfflineBackupOptions) -> Self {
Self {
overwrite: options.overwrite.unwrap_or(false),
}
}
}
impl From<core_backup::BackupResult> for BackupResult {
fn from(result: core_backup::BackupResult) -> Self {
BackupResult {
path: result.path,
size: result.size as i64,
timestamp: result.timestamp_ms as i64,
r#type: result.kind,
}
}
}
#[napi]
pub fn create_backup(
db: &Database,
backup_path: String,
options: Option<BackupOptions>,
) -> Result<BackupResult> {
let options = options.unwrap_or_default();
let core_options: core_backup::BackupOptions = options.clone().into();
let backup_path = PathBuf::from(backup_path);
match db.inner.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
core_backup::create_backup_single_file(db, &backup_path, core_options)
.map(BackupResult::from)
.map_err(|e| Error::from_reason(format!("Failed to create backup: {e}")))
}
None => Err(Error::from_reason("Database is closed")),
}
}
#[napi]
pub fn restore_backup(
backup_path: String,
restore_path: String,
options: Option<RestoreOptions>,
) -> Result<String> {
let options = options.unwrap_or_default();
let core_options: core_backup::RestoreOptions = options.into();
core_backup::restore_backup(backup_path, restore_path, core_options)
.map(|p| p.to_string_lossy().to_string())
.map_err(|e| Error::from_reason(format!("Failed to restore backup: {e}")))
}
#[napi]
pub fn backup_info(backup_path: String) -> Result<BackupResult> {
core_backup::backup_info(backup_path)
.map(BackupResult::from)
.map_err(|e| Error::from_reason(format!("Failed to inspect backup: {e}")))
}
#[napi]
pub fn create_offline_backup(
db_path: String,
backup_path: String,
options: Option<OfflineBackupOptions>,
) -> Result<BackupResult> {
let options = options.unwrap_or_default();
let core_options: core_backup::OfflineBackupOptions = options.into();
core_backup::create_offline_backup(db_path, backup_path, core_options)
.map(BackupResult::from)
.map_err(|e| Error::from_reason(format!("Failed to create offline backup: {e}")))
}