use pyo3::exceptions::PyRuntimeError;
use pyo3::prelude::*;
use pyo3::types::{PyDict, PyList};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::RwLock;
use crate::api::kite::KiteRuntimeProfile as RustKiteRuntimeProfile;
use crate::backup as core_backup;
use crate::core::single_file::{
close_single_file, close_single_file_with_options, is_single_file_path, open_single_file,
SingleFileCloseOptions as RustSingleFileCloseOptions, SingleFileDB as RustSingleFileDB,
VacuumOptions as RustVacuumOptions,
};
use crate::metrics as core_metrics;
use crate::replication::types::CommitToken;
use crate::types::{ETypeId, EdgeWithProps as CoreEdgeWithProps, NodeId, PropKeyId};
use super::ops::{
cache, edges, export_import, graph_traversal, labels, maintenance, nodes, properties, schema,
streaming as streaming_ops, transaction, vectors,
};
use super::options::{
BackupOptions, BackupResult, ExportOptions, ExportResult, ImportOptions, ImportResult,
OfflineBackupOptions, OpenOptions, PaginationOptions, RestoreOptions, RuntimeProfile,
SingleFileOptimizeOptions, StreamOptions,
};
use super::stats::{CacheStats, CheckResult, DatabaseMetrics, DbStats, HealthCheckResult};
use super::traversal::{PyPathEdge, PyPathResult, PyTraversalResult};
use super::types::{
Edge, EdgePage, EdgeWithProps, FullEdge, NodePage, NodeProp, NodeWithProps, PropValue,
};
type EdgePropsInput = (i64, u32, i64, Vec<(u32, PropValue)>);
pub(crate) enum DatabaseInner {
SingleFile(Box<RustSingleFileDB>),
}
macro_rules! dispatch {
($self:expr, |$sf:ident| $sf_expr:expr, |$gf:ident| $gf_expr:expr) => {{
let guard = $self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile($sf)) => $sf_expr,
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}};
}
macro_rules! dispatch_ok {
($self:expr, |$sf:ident| $sf_expr:expr, |$gf:ident| $gf_expr:expr) => {{
let guard = $self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile($sf)) => Ok($sf_expr),
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}};
}
macro_rules! dispatch_mut {
($self:expr, |$sf:ident| $sf_expr:expr, |$gf:ident| $gf_expr:expr) => {{
let mut guard = $self
.inner
.write()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_mut() {
Some(DatabaseInner::SingleFile($sf)) => $sf_expr,
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}};
}
macro_rules! dispatch_tx {
($self:expr, |$sf:ident| $sf_expr:expr, |$handle:ident| $gf_expr:expr) => {{
let guard = $self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile($sf)) => $sf_expr,
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}};
}
#[pyclass(name = "Database")]
pub struct PyDatabase {
pub(crate) inner: RwLock<Option<DatabaseInner>>,
}
#[pymethods]
impl PyDatabase {
#[new]
#[pyo3(signature = (path, options=None))]
fn new(path: String, options: Option<OpenOptions>) -> PyResult<Self> {
let options = options.unwrap_or_default();
let path_buf = PathBuf::from(&path);
if path_buf.exists() && path_buf.is_dir() {
return Err(PyRuntimeError::new_err(
"Single-file databases require a file path, not a directory",
));
}
let db_path = if is_single_file_path(&path_buf) {
path_buf
} else if path_buf.extension().is_none() {
PathBuf::from(format!("{path}.kitedb"))
} else {
return Err(PyRuntimeError::new_err(
"Single-file databases must use the .kitedb extension",
));
};
let opts = options
.to_single_file_options()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to parse options: {e}")))?;
let db = open_single_file(&db_path, opts)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to open database: {e}")))?;
Ok(PyDatabase {
inner: RwLock::new(Some(DatabaseInner::SingleFile(Box::new(db)))),
})
}
#[staticmethod]
#[pyo3(signature = (path, options=None))]
fn open(path: String, options: Option<OpenOptions>) -> PyResult<Self> {
Self::new(path, options)
}
fn close(&self) -> PyResult<()> {
let mut guard = self
.inner
.write()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
if let Some(db) = guard.take() {
match db {
DatabaseInner::SingleFile(db) => close_single_file(*db)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to close: {e}")))?,
}
}
Ok(())
}
#[pyo3(signature = (threshold))]
fn close_with_checkpoint_if_wal_over(&self, threshold: f64) -> PyResult<()> {
let mut guard = self
.inner
.write()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
if let Some(db) = guard.take() {
match db {
DatabaseInner::SingleFile(db) => close_single_file_with_options(
*db,
RustSingleFileCloseOptions::new().checkpoint_if_wal_usage_at_least(threshold),
)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to close: {e}")))?,
}
}
Ok(())
}
fn __enter__(slf: PyRef<'_, Self>) -> PyResult<PyRef<'_, Self>> {
Ok(slf)
}
#[pyo3(signature = (_exc_type=None, _exc_value=None, _traceback=None))]
fn __exit__(
&self,
_exc_type: Option<PyObject>,
_exc_value: Option<PyObject>,
_traceback: Option<PyObject>,
) -> PyResult<bool> {
self.close()?;
Ok(false)
}
#[getter]
fn is_open(&self) -> PyResult<bool> {
Ok(
self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?
.is_some(),
)
}
#[getter]
fn path(&self) -> PyResult<String> {
dispatch_ok!(self, |db| db.path.to_string_lossy().to_string(), |db| db
.path
.to_string_lossy()
.to_string())
}
#[getter]
fn read_only(&self) -> PyResult<bool> {
dispatch_ok!(self, |db| db.read_only, |db| db.read_only)
}
#[pyo3(signature = (read_only=None))]
fn begin(&self, read_only: Option<bool>) -> PyResult<i64> {
let read_only = read_only.unwrap_or(false);
dispatch!(
self,
|db| transaction::begin_single_file(db, read_only),
|_db| { unreachable!("multi-file database support removed") }
)
}
fn begin_bulk(&self) -> PyResult<i64> {
dispatch!(self, |db| transaction::begin_bulk_single_file(db), |_db| {
unreachable!("multi-file database support removed")
})
}
fn commit(&self) -> PyResult<()> {
dispatch!(self, |db| transaction::commit_single_file(db), |_db| {
unreachable!("multi-file database support removed")
})
}
fn rollback(&self) -> PyResult<()> {
dispatch!(self, |db| transaction::rollback_single_file(db), |_db| {
unreachable!("multi-file database support removed")
})
}
fn has_transaction(&self) -> PyResult<bool> {
dispatch_ok!(self, |db| db.has_transaction(), |_db| false)
}
fn commit_with_token(&self) -> PyResult<Option<String>> {
dispatch!(
self,
|db| db
.commit_with_token()
.map(|token| token.map(|value| value.to_string()))
.map_err(|e| PyRuntimeError::new_err(format!("Failed to commit: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
fn wait_for_token(&self, token: String, timeout_ms: i64) -> PyResult<bool> {
if timeout_ms < 0 {
return Err(PyRuntimeError::new_err("timeout_ms must be non-negative"));
}
let token = CommitToken::from_str(&token)
.map_err(|e| PyRuntimeError::new_err(format!("Invalid token: {e}")))?;
dispatch!(
self,
|db| db
.wait_for_token(token, timeout_ms as u64)
.map_err(|e| PyRuntimeError::new_err(format!("Failed waiting for token: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
fn primary_replication_status(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let Some(status) = db.primary_replication_status() else {
return Ok(None);
};
let out = PyDict::new_bound(py);
out.set_item("role", status.role.to_string())?;
out.set_item("epoch", status.epoch)?;
out.set_item("head_log_index", status.head_log_index)?;
out.set_item("retained_floor", status.retained_floor)?;
out.set_item(
"sidecar_path",
status.sidecar_path.to_string_lossy().to_string(),
)?;
out.set_item(
"last_token",
status.last_token.map(|token| token.to_string()),
)?;
out.set_item("append_attempts", status.append_attempts)?;
out.set_item("append_failures", status.append_failures)?;
out.set_item("append_successes", status.append_successes)?;
let lags = PyList::empty_bound(py);
for lag in status.replica_lags {
let lag_item = PyDict::new_bound(py);
lag_item.set_item("replica_id", lag.replica_id)?;
lag_item.set_item("epoch", lag.epoch)?;
lag_item.set_item("applied_log_index", lag.applied_log_index)?;
lags.append(lag_item)?;
}
out.set_item("replica_lags", lags)?;
Ok(Some(out.into_py(py)))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
fn replica_replication_status(&self, py: Python<'_>) -> PyResult<Option<PyObject>> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let Some(status) = db.replica_replication_status() else {
return Ok(None);
};
let out = PyDict::new_bound(py);
out.set_item("role", status.role.to_string())?;
out.set_item(
"source_db_path",
status
.source_db_path
.map(|path| path.to_string_lossy().to_string()),
)?;
out.set_item(
"source_sidecar_path",
status
.source_sidecar_path
.map(|path| path.to_string_lossy().to_string()),
)?;
out.set_item("applied_epoch", status.applied_epoch)?;
out.set_item("applied_log_index", status.applied_log_index)?;
out.set_item("last_error", status.last_error)?;
out.set_item("needs_reseed", status.needs_reseed)?;
Ok(Some(out.into_py(py)))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
fn primary_promote_to_next_epoch(&self) -> PyResult<i64> {
dispatch!(
self,
|db| db
.primary_promote_to_next_epoch()
.map(|value| value as i64)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to promote primary: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
fn primary_report_replica_progress(
&self,
replica_id: String,
epoch: i64,
applied_log_index: i64,
) -> PyResult<()> {
if epoch < 0 || applied_log_index < 0 {
return Err(PyRuntimeError::new_err(
"epoch and applied_log_index must be non-negative",
));
}
dispatch!(
self,
|db| db
.primary_report_replica_progress(&replica_id, epoch as u64, applied_log_index as u64)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to report replica progress: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
fn primary_run_retention(&self) -> PyResult<(i64, i64)> {
dispatch!(
self,
|db| db
.primary_run_retention()
.map(|outcome| (
outcome.pruned_segments as i64,
outcome.retained_floor as i64
))
.map_err(|e| PyRuntimeError::new_err(format!("Failed to run retention: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
#[pyo3(signature = (include_data=false))]
fn export_replication_snapshot_transport_json(&self, include_data: bool) -> PyResult<String> {
dispatch!(
self,
|db| db
.primary_export_snapshot_transport_json(include_data)
.map_err(|e| {
PyRuntimeError::new_err(format!("Failed to export replication snapshot: {e}"))
}),
|_db| { unreachable!("multi-file database support removed") }
)
}
#[pyo3(signature = (cursor=None, max_frames=128, max_bytes=1048576, include_payload=true))]
fn export_replication_log_transport_json(
&self,
cursor: Option<String>,
max_frames: i64,
max_bytes: i64,
include_payload: bool,
) -> PyResult<String> {
if max_frames <= 0 {
return Err(PyRuntimeError::new_err("max_frames must be positive"));
}
if max_bytes <= 0 {
return Err(PyRuntimeError::new_err("max_bytes must be positive"));
}
dispatch!(
self,
|db| db
.primary_export_log_transport_json(
cursor.as_deref(),
max_frames as usize,
max_bytes as usize,
include_payload,
)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to export replication log: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
fn replica_bootstrap_from_snapshot(&self) -> PyResult<()> {
dispatch!(
self,
|db| db
.replica_bootstrap_from_snapshot()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to bootstrap replica: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
fn replica_catch_up_once(&self, max_frames: i64) -> PyResult<i64> {
if max_frames < 0 {
return Err(PyRuntimeError::new_err("max_frames must be non-negative"));
}
dispatch!(
self,
|db| db
.replica_catch_up_once(max_frames as usize)
.map(|count| count as i64)
.map_err(|e| PyRuntimeError::new_err(format!("Failed replica catch-up: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
fn replica_reseed_from_snapshot(&self) -> PyResult<()> {
dispatch!(
self,
|db| db
.replica_reseed_from_snapshot()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to reseed replica: {e}"))),
|_db| { unreachable!("multi-file database support removed") }
)
}
#[pyo3(signature = (key=None))]
fn create_node(&self, key: Option<String>) -> PyResult<i64> {
dispatch_tx!(
self,
|db| nodes::create_node_single(db, key.as_deref()),
|h| nodes::create_node_single(h, key.clone())
)
}
fn delete_node(&self, node_id: i64) -> PyResult<()> {
dispatch_tx!(
self,
|db| nodes::delete_node_single(db, node_id as NodeId),
|h| nodes::delete_node_single(h, node_id as NodeId)
)
}
fn node_exists(&self, node_id: i64) -> PyResult<bool> {
dispatch_ok!(
self,
|db| nodes::node_exists_single(db, node_id as NodeId),
|db| nodes::node_exists_single(db, node_id as NodeId)
)
}
#[pyo3(name = "get_node_by_key")]
fn node_by_key(&self, key: &str) -> PyResult<Option<i64>> {
dispatch_ok!(self, |db| nodes::node_by_key_single(db, key), |db| {
nodes::node_by_key_single(db, key)
})
}
#[pyo3(name = "get_node_key")]
fn node_key(&self, node_id: i64) -> PyResult<Option<String>> {
dispatch_ok!(
self,
|db| nodes::node_key_single(db, node_id as NodeId),
|db| nodes::node_key_single(db, node_id as NodeId)
)
}
fn list_nodes(&self) -> PyResult<Vec<i64>> {
dispatch_ok!(self, |db| nodes::list_nodes_single(db), |db| {
nodes::list_nodes_single(db)
})
}
fn count_nodes(&self) -> PyResult<i64> {
dispatch_ok!(self, |db| nodes::count_nodes_single(db), |db| {
nodes::count_nodes_single(db)
})
}
fn list_nodes_with_prefix(&self, prefix: &str) -> PyResult<Vec<i64>> {
dispatch_ok!(
self,
|db| nodes::list_nodes_with_prefix_single(db, prefix),
|db| nodes::list_nodes_with_prefix_single(db, prefix)
)
}
fn count_nodes_with_prefix(&self, prefix: &str) -> PyResult<i64> {
dispatch_ok!(
self,
|db| nodes::count_nodes_with_prefix_single(db, prefix),
|db| nodes::count_nodes_with_prefix_single(db, prefix)
)
}
fn batch_create_nodes(
&self,
input_nodes: Vec<(String, Vec<(u32, PropValue)>)>,
) -> PyResult<Vec<i64>> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let mut ids = Vec::with_capacity(input_nodes.len());
let mut keys = Vec::with_capacity(input_nodes.len());
let mut props_list = Vec::with_capacity(input_nodes.len());
for (key, props) in input_nodes {
keys.push(key);
props_list.push(props);
}
db.begin_bulk()
.map_err(|e| PyRuntimeError::new_err(format!("Failed to begin bulk: {e}")))?;
let result: Result<(), PyErr> = (|| {
let key_refs: Vec<Option<&str>> = keys.iter().map(|k| Some(k.as_str())).collect();
let node_ids = db
.create_nodes_batch(&key_refs)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
for (node_id, props) in node_ids.iter().copied().zip(props_list.iter()) {
for (k, v) in props.iter() {
db.set_node_prop(node_id, *k as PropKeyId, v.clone().into())
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
}
ids.push(node_id as i64);
}
Ok(())
})();
match result {
Ok(()) => {
db.commit()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
Ok(ids)
}
Err(e) => {
let _ = db.rollback();
Err(e)
}
}
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
fn create_nodes_batch(&self, keys: Vec<Option<String>>) -> PyResult<Vec<i64>> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let key_refs: Vec<Option<&str>> = keys.iter().map(|k| k.as_deref()).collect();
let node_ids = db
.create_nodes_batch(&key_refs)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to create nodes: {e}")))?;
Ok(node_ids.into_iter().map(|id| id as i64).collect())
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
fn add_edges_batch(&self, edges: Vec<(i64, u32, i64)>) -> PyResult<()> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let core_edges: Vec<(NodeId, ETypeId, NodeId)> = edges
.into_iter()
.map(|(src, etype, dst)| (src as NodeId, etype as ETypeId, dst as NodeId))
.collect();
db.add_edges_batch(&core_edges)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to add edges: {e}")))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
fn add_edges_with_props_batch(&self, edges: Vec<EdgePropsInput>) -> PyResult<()> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
let core_edges: Vec<CoreEdgeWithProps> = edges
.into_iter()
.map(|(src, etype, dst, props)| {
let core_props = props
.into_iter()
.map(|(key_id, value)| (key_id as PropKeyId, value.into()))
.collect();
(src as NodeId, etype as ETypeId, dst as NodeId, core_props)
})
.collect();
db.add_edges_with_props_batch(core_edges)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to add edges: {e}")))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
fn upsert_node(&self, key: String, props: Vec<(u32, Option<PropValue>)>) -> PyResult<i64> {
let core_props: Vec<(PropKeyId, Option<crate::types::PropValue>)> = props
.into_iter()
.map(|(k, v)| (k as PropKeyId, v.map(|value| value.into())))
.collect();
dispatch_tx!(
self,
|db| nodes::upsert_node_single(db, &key, &core_props),
|h| nodes::upsert_node_single(h, &key, &core_props)
)
}
fn upsert_node_by_id(&self, node_id: i64, props: Vec<(u32, Option<PropValue>)>) -> PyResult<i64> {
let core_props: Vec<(PropKeyId, Option<crate::types::PropValue>)> = props
.into_iter()
.map(|(k, v)| (k as PropKeyId, v.map(|value| value.into())))
.collect();
dispatch_tx!(
self,
|db| nodes::upsert_node_by_id_single(db, node_id as NodeId, &core_props),
|h| nodes::upsert_node_by_id_single(h, node_id as NodeId, &core_props)
)
}
fn add_edge(&self, src: i64, etype: u32, dst: i64) -> PyResult<()> {
dispatch_tx!(
self,
|db| edges::add_edge_single(db, src as NodeId, etype as ETypeId, dst as NodeId),
|h| edges::add_edge_single(h, src as NodeId, etype as ETypeId, dst as NodeId)
)
}
fn add_edge_by_name(&self, src: i64, etype_name: &str, dst: i64) -> PyResult<()> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
edges::add_edge_by_name_single(db, src as NodeId, etype_name, dst as NodeId)
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
fn delete_edge(&self, src: i64, etype: u32, dst: i64) -> PyResult<()> {
dispatch_tx!(
self,
|db| edges::delete_edge_single(db, src as NodeId, etype as ETypeId, dst as NodeId),
|h| edges::delete_edge_single(h, src as NodeId, etype as ETypeId, dst as NodeId)
)
}
fn upsert_edge(
&self,
src: i64,
etype: u32,
dst: i64,
props: Vec<(u32, Option<PropValue>)>,
) -> PyResult<bool> {
let core_props: Vec<(PropKeyId, Option<crate::types::PropValue>)> = props
.into_iter()
.map(|(k, v)| (k as PropKeyId, v.map(|value| value.into())))
.collect();
dispatch_tx!(
self,
|db| edges::upsert_edge_single(
db,
src as NodeId,
etype as ETypeId,
dst as NodeId,
&core_props
),
|h| edges::upsert_edge_single(
h,
src as NodeId,
etype as ETypeId,
dst as NodeId,
&core_props
)
)
}
fn edge_exists(&self, src: i64, etype: u32, dst: i64) -> PyResult<bool> {
dispatch_ok!(
self,
|db| edges::edge_exists_single(db, src as NodeId, etype as ETypeId, dst as NodeId),
|db| edges::edge_exists_single(db, src as NodeId, etype as ETypeId, dst as NodeId)
)
}
#[pyo3(name = "get_out_edges")]
fn out_edges(&self, node_id: i64) -> PyResult<Vec<Edge>> {
dispatch_ok!(
self,
|db| edges::out_edges_single(db, node_id as NodeId),
|db| edges::out_edges_single(db, node_id as NodeId)
)
}
#[pyo3(name = "get_in_edges")]
fn in_edges(&self, node_id: i64) -> PyResult<Vec<Edge>> {
dispatch_ok!(
self,
|db| edges::in_edges_single(db, node_id as NodeId),
|db| edges::in_edges_single(db, node_id as NodeId)
)
}
#[pyo3(name = "get_out_degree")]
fn out_degree(&self, node_id: i64) -> PyResult<i64> {
dispatch_ok!(
self,
|db| edges::out_degree_single(db, node_id as NodeId),
|db| edges::out_degree_single(db, node_id as NodeId)
)
}
#[pyo3(name = "get_in_degree")]
fn in_degree(&self, node_id: i64) -> PyResult<i64> {
dispatch_ok!(
self,
|db| edges::in_degree_single(db, node_id as NodeId),
|db| edges::in_degree_single(db, node_id as NodeId)
)
}
fn count_edges(&self) -> PyResult<i64> {
dispatch_ok!(self, |db| edges::count_edges_single(db), |db| {
edges::count_edges_single(db, None)
})
}
fn count_edges_by_type(&self, etype: u32) -> PyResult<i64> {
dispatch_ok!(
self,
|db| edges::count_edges_by_type_single(db, etype as ETypeId),
|db| edges::count_edges_single(db, Some(etype as ETypeId))
)
}
#[pyo3(signature = (etype=None))]
fn list_edges(&self, etype: Option<u32>) -> PyResult<Vec<FullEdge>> {
dispatch_ok!(
self,
|db| edges::list_edges_single(db, etype.map(|e| e as ETypeId)),
|db| edges::list_edges_single(db, etype.map(|e| e as ETypeId))
)
}
fn set_node_prop(&self, node_id: i64, key_id: u32, value: PropValue) -> PyResult<()> {
dispatch_tx!(
self,
|db| properties::set_node_prop_single(
db,
node_id as NodeId,
key_id as PropKeyId,
value.into()
),
|h| properties::set_node_prop_single(
h,
node_id as NodeId,
key_id as PropKeyId,
value.clone().into()
)
)
}
fn set_node_prop_by_name(&self, node_id: i64, key_name: &str, value: PropValue) -> PyResult<()> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
properties::set_node_prop_by_name_single(db, node_id as NodeId, key_name, value.into())
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyo3(name = "get_node_prop")]
fn node_prop(&self, node_id: i64, key_id: u32) -> PyResult<Option<PropValue>> {
dispatch_ok!(
self,
|db| properties::node_prop_single(db, node_id as NodeId, key_id as PropKeyId),
|db| properties::node_prop_single(db, node_id as NodeId, key_id as PropKeyId)
)
}
fn delete_node_prop(&self, node_id: i64, key_id: u32) -> PyResult<()> {
dispatch_tx!(
self,
|db| properties::delete_node_prop_single(db, node_id as NodeId, key_id as PropKeyId),
|h| properties::delete_node_prop_single(h, node_id as NodeId, key_id as PropKeyId)
)
}
#[pyo3(name = "get_node_props")]
fn node_props(&self, node_id: i64) -> PyResult<Option<Vec<NodeProp>>> {
dispatch_ok!(
self,
|db| properties::node_props_single(db, node_id as NodeId),
|db| properties::node_props_single(db, node_id as NodeId)
)
}
fn set_edge_prop(
&self,
src: i64,
etype: u32,
dst: i64,
key_id: u32,
value: PropValue,
) -> PyResult<()> {
dispatch_tx!(
self,
|db| properties::set_edge_prop_single(
db,
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId,
value.into()
),
|h| properties::set_edge_prop_single(
h,
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId,
value.clone().into()
)
)
}
fn set_edge_prop_by_name(
&self,
src: i64,
etype: u32,
dst: i64,
key_name: &str,
value: PropValue,
) -> PyResult<()> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => properties::set_edge_prop_by_name_single(
db,
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_name,
value.into(),
),
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyo3(name = "get_edge_prop")]
fn edge_prop(&self, src: i64, etype: u32, dst: i64, key_id: u32) -> PyResult<Option<PropValue>> {
dispatch_ok!(
self,
|db| properties::edge_prop_single(
db,
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId
),
|db| properties::edge_prop_single(
db,
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId
)
)
}
fn delete_edge_prop(&self, src: i64, etype: u32, dst: i64, key_id: u32) -> PyResult<()> {
dispatch_tx!(
self,
|db| properties::delete_edge_prop_single(
db,
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId
),
|h| properties::delete_edge_prop_single(
h,
src as NodeId,
etype as ETypeId,
dst as NodeId,
key_id as PropKeyId
)
)
}
#[pyo3(name = "get_edge_props")]
fn edge_props(&self, src: i64, etype: u32, dst: i64) -> PyResult<Option<Vec<NodeProp>>> {
dispatch_ok!(
self,
|db| properties::edge_props_single(db, src as NodeId, etype as ETypeId, dst as NodeId),
|db| properties::edge_props_single(db, src as NodeId, etype as ETypeId, dst as NodeId)
)
}
#[pyo3(name = "get_node_prop_string")]
fn node_prop_string(&self, node_id: i64, key_id: u32) -> PyResult<Option<String>> {
dispatch_ok!(
self,
|db| properties::node_prop_string_single(db, node_id as NodeId, key_id as PropKeyId),
|db| properties::node_prop_string_single(db, node_id as NodeId, key_id as PropKeyId)
)
}
#[pyo3(name = "get_node_prop_int")]
fn node_prop_int(&self, node_id: i64, key_id: u32) -> PyResult<Option<i64>> {
dispatch_ok!(
self,
|db| properties::node_prop_int_single(db, node_id as NodeId, key_id as PropKeyId),
|db| properties::node_prop_int_single(db, node_id as NodeId, key_id as PropKeyId)
)
}
#[pyo3(name = "get_node_prop_float")]
fn node_prop_float(&self, node_id: i64, key_id: u32) -> PyResult<Option<f64>> {
dispatch_ok!(
self,
|db| properties::node_prop_float_single(db, node_id as NodeId, key_id as PropKeyId),
|db| properties::node_prop_float_single(db, node_id as NodeId, key_id as PropKeyId)
)
}
#[pyo3(name = "get_node_prop_bool")]
fn node_prop_bool(&self, node_id: i64, key_id: u32) -> PyResult<Option<bool>> {
dispatch_ok!(
self,
|db| properties::node_prop_bool_single(db, node_id as NodeId, key_id as PropKeyId),
|db| properties::node_prop_bool_single(db, node_id as NodeId, key_id as PropKeyId)
)
}
#[pyo3(name = "get_or_create_label")]
fn ensure_label(&self, name: &str) -> PyResult<u32> {
dispatch_ok!(self, |db| schema::ensure_label_single(db, name), |db| {
schema::ensure_label_single(db, name)
})
}
#[pyo3(name = "get_label_id")]
fn label_id(&self, name: &str) -> PyResult<Option<u32>> {
dispatch_ok!(self, |db| schema::label_id_single(db, name), |db| {
schema::label_id_single(db, name)
})
}
#[pyo3(name = "get_label_name")]
fn label_name(&self, id: u32) -> PyResult<Option<String>> {
dispatch_ok!(self, |db| schema::label_name_single(db, id), |db| {
schema::label_name_single(db, id)
})
}
#[pyo3(name = "get_or_create_etype")]
fn ensure_etype(&self, name: &str) -> PyResult<u32> {
dispatch_ok!(self, |db| schema::ensure_etype_single(db, name), |db| {
schema::ensure_etype_single(db, name)
})
}
#[pyo3(name = "get_etype_id")]
fn etype_id(&self, name: &str) -> PyResult<Option<u32>> {
dispatch_ok!(self, |db| schema::etype_id_single(db, name), |db| {
schema::etype_id_single(db, name)
})
}
#[pyo3(name = "get_etype_name")]
fn etype_name(&self, id: u32) -> PyResult<Option<String>> {
dispatch_ok!(self, |db| schema::etype_name_single(db, id), |db| {
schema::etype_name_single(db, id)
})
}
#[pyo3(name = "get_or_create_propkey")]
fn ensure_propkey(&self, name: &str) -> PyResult<u32> {
dispatch_ok!(self, |db| schema::ensure_propkey_single(db, name), |db| {
schema::ensure_propkey_single(db, name)
})
}
#[pyo3(name = "get_propkey_id")]
fn propkey_id(&self, name: &str) -> PyResult<Option<u32>> {
dispatch_ok!(self, |db| schema::propkey_id_single(db, name), |db| {
schema::propkey_id_single(db, name)
})
}
#[pyo3(name = "get_propkey_name")]
fn propkey_name(&self, id: u32) -> PyResult<Option<String>> {
dispatch_ok!(self, |db| schema::propkey_name_single(db, id), |db| {
schema::propkey_name_single(db, id)
})
}
fn define_label(&self, name: &str) -> PyResult<u32> {
dispatch_tx!(self, |db| labels::define_label_single(db, name), |h| {
labels::define_label_single(h, name)
})
}
fn add_node_label(&self, node_id: i64, label_id: u32) -> PyResult<()> {
dispatch_tx!(
self,
|db| labels::add_node_label_single(db, node_id as NodeId, label_id),
|h| labels::add_node_label_single(h, node_id as NodeId, label_id)
)
}
fn add_node_label_by_name(&self, node_id: i64, label_name: &str) -> PyResult<()> {
let guard = self
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(db)) => {
labels::add_node_label_by_name_single(db, node_id as NodeId, label_name)
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
fn remove_node_label(&self, node_id: i64, label_id: u32) -> PyResult<()> {
dispatch_tx!(
self,
|db| labels::remove_node_label_single(db, node_id as NodeId, label_id),
|h| labels::remove_node_label_single(h, node_id as NodeId, label_id)
)
}
fn node_has_label(&self, node_id: i64, label_id: u32) -> PyResult<bool> {
dispatch_ok!(
self,
|db| labels::node_has_label_single(db, node_id as NodeId, label_id),
|db| labels::node_has_label_single(db, node_id as NodeId, label_id)
)
}
#[pyo3(name = "get_node_labels")]
fn node_labels(&self, node_id: i64) -> PyResult<Vec<u32>> {
dispatch_ok!(
self,
|db| labels::node_labels_single(db, node_id as NodeId),
|db| labels::node_labels_single(db, node_id as NodeId)
)
}
fn set_node_vector(&self, node_id: i64, prop_key_id: u32, vector: Vec<f64>) -> PyResult<()> {
let v: Vec<f32> = vector.iter().map(|&x| x as f32).collect();
dispatch_tx!(
self,
|db| vectors::set_node_vector_single(db, node_id as NodeId, prop_key_id as PropKeyId, &v),
|h| vectors::set_node_vector_single(h, node_id as NodeId, prop_key_id as PropKeyId, &v)
)
}
#[pyo3(name = "get_node_vector")]
fn node_vector(&self, node_id: i64, prop_key_id: u32) -> PyResult<Option<Vec<f64>>> {
dispatch_ok!(
self,
|db| vectors::node_vector_single(db, node_id as NodeId, prop_key_id as PropKeyId),
|db| vectors::node_vector_single(db, node_id as NodeId, prop_key_id as PropKeyId)
)
}
fn delete_node_vector(&self, node_id: i64, prop_key_id: u32) -> PyResult<()> {
dispatch_tx!(
self,
|db| vectors::delete_node_vector_single(db, node_id as NodeId, prop_key_id as PropKeyId),
|h| vectors::delete_node_vector_single(h, node_id as NodeId, prop_key_id as PropKeyId)
)
}
fn has_node_vector(&self, node_id: i64, prop_key_id: u32) -> PyResult<bool> {
dispatch_ok!(
self,
|db| vectors::has_node_vector_single(db, node_id as NodeId, prop_key_id as PropKeyId),
|db| vectors::has_node_vector_single(db, node_id as NodeId, prop_key_id as PropKeyId)
)
}
#[pyo3(signature = (node_id, etype=None))]
fn traverse_out(&self, node_id: i64, etype: Option<u32>) -> PyResult<Vec<i64>> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_out_single(db, node_id as NodeId, etype),
|db| graph_traversal::traverse_out_single(db, node_id as NodeId, etype)
)
}
#[pyo3(signature = (node_id, etype=None))]
fn traverse_out_with_keys(
&self,
node_id: i64,
etype: Option<u32>,
) -> PyResult<Vec<(i64, Option<String>)>> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_out_with_keys_single(db, node_id as NodeId, etype),
|db| graph_traversal::traverse_out_with_keys_single(db, node_id as NodeId, etype)
)
}
#[pyo3(signature = (node_id, etype=None))]
fn traverse_out_count(&self, node_id: i64, etype: Option<u32>) -> PyResult<i64> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_out_count_single(db, node_id as NodeId, etype),
|db| graph_traversal::traverse_out_count_single(db, node_id as NodeId, etype)
)
}
#[pyo3(signature = (node_id, etype=None))]
fn traverse_in(&self, node_id: i64, etype: Option<u32>) -> PyResult<Vec<i64>> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_in_single(db, node_id as NodeId, etype),
|db| graph_traversal::traverse_in_single(db, node_id as NodeId, etype)
)
}
#[pyo3(signature = (node_id, etype=None))]
fn traverse_in_with_keys(
&self,
node_id: i64,
etype: Option<u32>,
) -> PyResult<Vec<(i64, Option<String>)>> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_in_with_keys_single(db, node_id as NodeId, etype),
|db| graph_traversal::traverse_in_with_keys_single(db, node_id as NodeId, etype)
)
}
#[pyo3(signature = (node_id, etype=None))]
fn traverse_in_count(&self, node_id: i64, etype: Option<u32>) -> PyResult<i64> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_in_count_single(db, node_id as NodeId, etype),
|db| graph_traversal::traverse_in_count_single(db, node_id as NodeId, etype)
)
}
fn traverse_multi(
&self,
start_ids: Vec<i64>,
steps: Vec<(String, Option<u32>)>,
) -> PyResult<Vec<(i64, Option<String>)>> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_multi_single(db, start_ids.clone(), steps.clone()),
|db| graph_traversal::traverse_multi_single(db, start_ids.clone(), steps.clone())
)
}
fn traverse_multi_count(
&self,
start_ids: Vec<i64>,
steps: Vec<(String, Option<u32>)>,
) -> PyResult<i64> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_multi_count_single(db, start_ids.clone(), steps.clone()),
|db| graph_traversal::traverse_multi_count_single(db, start_ids.clone(), steps.clone())
)
}
#[pyo3(signature = (node_id, max_depth, etype=None, min_depth=None, direction=None, unique=None))]
fn traverse(
&self,
node_id: i64,
max_depth: u32,
etype: Option<u32>,
min_depth: Option<u32>,
direction: Option<String>,
unique: Option<bool>,
) -> PyResult<Vec<PyTraversalResult>> {
dispatch_ok!(
self,
|db| graph_traversal::traverse_single(
db,
node_id as NodeId,
max_depth,
etype,
min_depth,
direction.clone(),
unique
),
|db| graph_traversal::traverse_single(
db,
node_id as NodeId,
max_depth,
etype,
min_depth,
direction.clone(),
unique
)
)
}
#[pyo3(signature = (source, target, etype=None, max_depth=None, direction=None))]
fn find_path_bfs(
&self,
source: i64,
target: i64,
etype: Option<u32>,
max_depth: Option<u32>,
direction: Option<String>,
) -> PyResult<PyPathResult> {
dispatch_ok!(
self,
|db| graph_traversal::find_path_bfs_single(
db,
source as NodeId,
target as NodeId,
etype,
max_depth,
direction.clone()
),
|db| graph_traversal::find_path_bfs_single(
db,
source as NodeId,
target as NodeId,
etype,
max_depth,
direction.clone()
)
)
}
#[pyo3(signature = (source, target, etype=None, max_depth=None, direction=None))]
fn find_path_dijkstra(
&self,
source: i64,
target: i64,
etype: Option<u32>,
max_depth: Option<u32>,
direction: Option<String>,
) -> PyResult<PyPathResult> {
dispatch_ok!(
self,
|db| graph_traversal::find_path_dijkstra_single(
db,
source as NodeId,
target as NodeId,
etype,
max_depth,
direction.clone()
),
|db| graph_traversal::find_path_dijkstra_single(
db,
source as NodeId,
target as NodeId,
etype,
max_depth,
direction.clone()
)
)
}
#[pyo3(signature = (source, target, etype=None, max_depth=None, direction=None))]
fn has_path(
&self,
source: i64,
target: i64,
etype: Option<u32>,
max_depth: Option<u32>,
direction: Option<String>,
) -> PyResult<bool> {
let path = dispatch_ok!(
self,
|db| graph_traversal::find_path_bfs_single(
db,
source as NodeId,
target as NodeId,
etype,
max_depth,
direction.clone()
),
|db| graph_traversal::find_path_bfs_single(
db,
source as NodeId,
target as NodeId,
etype,
max_depth,
direction.clone()
)
)?;
Ok(path.found)
}
#[pyo3(signature = (source, max_depth, etype=None))]
fn reachable_nodes(&self, source: i64, max_depth: u32, etype: Option<u32>) -> PyResult<Vec<i64>> {
let min_depth = Some(1);
let direction = Some("out".to_string());
let unique = Some(true);
let results = dispatch_ok!(
self,
|db| graph_traversal::traverse_single(
db,
source as NodeId,
max_depth,
etype,
min_depth,
direction.clone(),
unique
),
|db| graph_traversal::traverse_single(
db,
source as NodeId,
max_depth,
etype,
min_depth,
direction.clone(),
unique
)
)?;
Ok(results.into_iter().map(|r| r.node_id).collect())
}
fn checkpoint(&self) -> PyResult<()> {
dispatch!(self, |db| maintenance::checkpoint_single(db), |_db| Ok(()))
}
fn background_checkpoint(&self) -> PyResult<()> {
dispatch!(
self,
|db| maintenance::background_checkpoint_single(db),
|_db| Ok(())
)
}
#[pyo3(signature = (threshold=0.5))]
fn should_checkpoint(&self, threshold: f64) -> PyResult<bool> {
dispatch_ok!(
self,
|db| maintenance::should_checkpoint_single(db, threshold),
|_db| false
)
}
#[pyo3(signature = (options=None))]
fn optimize(&mut self, options: Option<SingleFileOptimizeOptions>) -> PyResult<()> {
dispatch_mut!(
self,
|db| {
let opts = match options {
Some(o) => Some(o.to_core()?),
None => None,
};
maintenance::optimize_single(db, opts)
},
|db| maintenance::optimize_single(db)
)
}
#[pyo3(signature = (shrink_wal=true, min_wal_size=None))]
fn vacuum(&mut self, shrink_wal: bool, min_wal_size: Option<u64>) -> PyResult<()> {
dispatch_mut!(
self,
|db| maintenance::vacuum_single(
db,
Some(RustVacuumOptions {
shrink_wal,
min_wal_size
})
),
|_db| Ok(())
)
}
fn stats(&self) -> PyResult<DbStats> {
dispatch_ok!(self, |db| maintenance::stats_single(db), |_db| {
unreachable!("multi-file database support removed")
})
}
fn check(&self) -> PyResult<CheckResult> {
dispatch_ok!(self, |db| maintenance::check_single(db), |_db| {
unreachable!("multi-file database support removed")
})
}
fn cache_is_enabled(&self) -> PyResult<bool> {
dispatch_ok!(self, |db| cache::cache_is_enabled(db), |_db| false)
}
fn cache_invalidate_node(&self, node_id: i64) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_invalidate_node(db, node_id as NodeId);
},
|_db| ()
)
}
fn cache_invalidate_edge(&self, src: i64, etype: u32, dst: i64) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_invalidate_edge(db, src as NodeId, etype as ETypeId, dst as NodeId);
},
|_db| ()
)
}
fn cache_invalidate_key(&self, key: &str) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_invalidate_key(db, key);
},
|_db| ()
)
}
fn cache_clear(&self) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_clear(db);
},
|_db| ()
)
}
fn cache_clear_query(&self) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_clear_query(db);
},
|_db| ()
)
}
fn cache_clear_key(&self) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_clear_key(db);
},
|_db| ()
)
}
fn cache_clear_property(&self) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_clear_property(db);
},
|_db| ()
)
}
fn cache_clear_traversal(&self) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_clear_traversal(db);
},
|_db| ()
)
}
fn cache_stats(&self) -> PyResult<Option<CacheStats>> {
dispatch_ok!(self, |db| cache::cache_stats(db), |_db| None)
}
fn cache_reset_stats(&self) -> PyResult<()> {
dispatch_ok!(
self,
|db| {
cache::cache_reset_stats(db);
},
|_db| ()
)
}
#[pyo3(signature = (options=None))]
fn stream_nodes(&self, options: Option<StreamOptions>) -> PyResult<Vec<Vec<i64>>> {
let opts = match options {
Some(o) => o.to_rust()?,
None => crate::streaming::StreamOptions::default(),
};
dispatch_ok!(
self,
|db| streaming_ops::stream_nodes_single(db, opts.clone()),
|db| streaming_ops::stream_nodes_single(db, opts.clone())
)
}
#[pyo3(signature = (options=None))]
fn stream_nodes_with_props(
&self,
options: Option<StreamOptions>,
) -> PyResult<Vec<Vec<NodeWithProps>>> {
let opts = match options {
Some(o) => o.to_rust()?,
None => crate::streaming::StreamOptions::default(),
};
dispatch_ok!(
self,
|db| streaming_ops::stream_nodes_with_props_single(db, opts.clone()),
|db| streaming_ops::stream_nodes_with_props_single(db, opts.clone())
)
}
#[pyo3(signature = (options=None))]
fn stream_edges(&self, options: Option<StreamOptions>) -> PyResult<Vec<Vec<FullEdge>>> {
let opts = match options {
Some(o) => o.to_rust()?,
None => crate::streaming::StreamOptions::default(),
};
dispatch_ok!(
self,
|db| streaming_ops::stream_edges_single(db, opts.clone()),
|db| streaming_ops::stream_edges_single(db, opts.clone())
)
}
#[pyo3(signature = (options=None))]
fn stream_edges_with_props(
&self,
options: Option<StreamOptions>,
) -> PyResult<Vec<Vec<EdgeWithProps>>> {
let opts = match options {
Some(o) => o.to_rust()?,
None => crate::streaming::StreamOptions::default(),
};
dispatch_ok!(
self,
|db| streaming_ops::stream_edges_with_props_single(db, opts.clone()),
|db| streaming_ops::stream_edges_with_props_single(db, opts.clone())
)
}
#[pyo3(signature = (options=None))]
#[pyo3(name = "get_nodes_page")]
fn nodes_page(&self, options: Option<PaginationOptions>) -> PyResult<NodePage> {
let opts = match options {
Some(o) => o.to_rust()?,
None => crate::streaming::PaginationOptions::default(),
};
dispatch_ok!(
self,
|db| streaming_ops::nodes_page_single(db, opts.clone()),
|db| streaming_ops::nodes_page_single(db, opts.clone())
)
}
#[pyo3(signature = (options=None))]
#[pyo3(name = "get_edges_page")]
fn edges_page(&self, options: Option<PaginationOptions>) -> PyResult<EdgePage> {
let opts = match options {
Some(o) => o.to_rust()?,
None => crate::streaming::PaginationOptions::default(),
};
dispatch_ok!(
self,
|db| streaming_ops::edges_page_single(db, opts.clone()),
|db| streaming_ops::edges_page_single(db, opts.clone())
)
}
#[pyo3(signature = (path, options=None))]
fn export_to_json(&self, path: String, options: Option<ExportOptions>) -> PyResult<ExportResult> {
let opts = options.unwrap_or_default();
dispatch!(
self,
|db| export_import::export_to_json_single(db, path.clone(), opts.clone()),
|db| export_import::export_to_json_single(db, path.clone(), opts.clone())
)
}
#[pyo3(signature = (path, options=None))]
fn export_to_jsonl(
&self,
path: String,
options: Option<ExportOptions>,
) -> PyResult<ExportResult> {
let opts = options.unwrap_or_default();
dispatch!(
self,
|db| export_import::export_to_jsonl_single(db, path.clone(), opts.clone()),
|db| export_import::export_to_jsonl_single(db, path.clone(), opts.clone())
)
}
#[pyo3(signature = (path, options=None))]
fn import_from_json(
&self,
path: String,
options: Option<ImportOptions>,
) -> PyResult<ImportResult> {
let opts = options.unwrap_or_default();
dispatch!(
self,
|db| export_import::import_from_json_single(db, path.clone(), opts.clone()),
|db| export_import::import_from_json_single(db, path.clone(), opts.clone())
)
}
}
#[pyfunction]
#[pyo3(signature = (path, options=None))]
pub fn open_database(path: String, options: Option<OpenOptions>) -> PyResult<PyDatabase> {
PyDatabase::new(path, options)
}
#[pyfunction]
pub fn recommended_safe_profile() -> RuntimeProfile {
RuntimeProfile::from_kite_runtime_profile(RustKiteRuntimeProfile::safe())
}
#[pyfunction]
pub fn recommended_balanced_profile() -> RuntimeProfile {
RuntimeProfile::from_kite_runtime_profile(RustKiteRuntimeProfile::balanced())
}
#[pyfunction]
pub fn recommended_reopen_heavy_profile() -> RuntimeProfile {
RuntimeProfile::from_kite_runtime_profile(RustKiteRuntimeProfile::reopen_heavy())
}
#[pyfunction]
pub fn collect_metrics(db: &PyDatabase) -> PyResult<DatabaseMetrics> {
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => Ok(DatabaseMetrics::from(
core_metrics::collect_metrics_single_file(d),
)),
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
pub fn collect_replication_metrics_prometheus(db: &PyDatabase) -> PyResult<String> {
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => {
Ok(core_metrics::collect_replication_metrics_prometheus_single_file(d))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
pub fn collect_replication_metrics_otel_json(db: &PyDatabase) -> PyResult<String> {
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => {
Ok(core_metrics::collect_replication_metrics_otel_json_single_file(d))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
pub fn collect_replication_metrics_otel_protobuf(db: &PyDatabase) -> PyResult<Vec<u8>> {
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => {
Ok(core_metrics::collect_replication_metrics_otel_protobuf_single_file(d))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
#[pyo3(signature = (db, include_data=false))]
pub fn collect_replication_snapshot_transport_json(
db: &PyDatabase,
include_data: bool,
) -> PyResult<String> {
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => d
.primary_export_snapshot_transport_json(include_data)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to export replication snapshot: {e}"))),
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
#[pyo3(signature = (db, cursor=None, max_frames=128, max_bytes=1048576, include_payload=true))]
pub fn collect_replication_log_transport_json(
db: &PyDatabase,
cursor: Option<String>,
max_frames: i64,
max_bytes: i64,
include_payload: bool,
) -> PyResult<String> {
if max_frames <= 0 {
return Err(PyRuntimeError::new_err("max_frames must be positive"));
}
if max_bytes <= 0 {
return Err(PyRuntimeError::new_err("max_bytes must be positive"));
}
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => d
.primary_export_log_transport_json(
cursor.as_deref(),
max_frames as usize,
max_bytes as usize,
include_payload,
)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to export replication log: {e}"))),
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[allow(clippy::too_many_arguments)]
fn build_otel_push_options_py(
timeout_ms: i64,
bearer_token: Option<String>,
retry_max_attempts: i64,
retry_backoff_ms: i64,
retry_backoff_max_ms: i64,
retry_jitter_ratio: f64,
adaptive_retry: bool,
adaptive_retry_mode: Option<String>,
adaptive_retry_ewma_alpha: f64,
circuit_breaker_failure_threshold: i64,
circuit_breaker_open_ms: i64,
circuit_breaker_half_open_probes: i64,
circuit_breaker_state_path: Option<String>,
circuit_breaker_state_url: Option<String>,
circuit_breaker_state_patch: bool,
circuit_breaker_state_patch_batch: bool,
circuit_breaker_state_patch_batch_max_keys: i64,
circuit_breaker_state_patch_merge: bool,
circuit_breaker_state_patch_merge_max_keys: i64,
circuit_breaker_state_patch_retry_max_attempts: i64,
circuit_breaker_state_cas: bool,
circuit_breaker_state_lease_id: Option<String>,
circuit_breaker_scope_key: Option<String>,
compression_gzip: bool,
https_only: bool,
ca_cert_pem_path: Option<String>,
client_cert_pem_path: Option<String>,
client_key_pem_path: Option<String>,
) -> PyResult<core_metrics::OtlpHttpPushOptions> {
if timeout_ms <= 0 {
return Err(PyRuntimeError::new_err("timeout_ms must be positive"));
}
if retry_max_attempts <= 0 {
return Err(PyRuntimeError::new_err(
"retry_max_attempts must be positive",
));
}
if retry_backoff_ms < 0 {
return Err(PyRuntimeError::new_err(
"retry_backoff_ms must be non-negative",
));
}
if retry_backoff_max_ms < 0 {
return Err(PyRuntimeError::new_err(
"retry_backoff_max_ms must be non-negative",
));
}
if retry_backoff_max_ms > 0 && retry_backoff_max_ms < retry_backoff_ms {
return Err(PyRuntimeError::new_err(
"retry_backoff_max_ms must be >= retry_backoff_ms when non-zero",
));
}
if !(0.0..=1.0).contains(&retry_jitter_ratio) {
return Err(PyRuntimeError::new_err(
"retry_jitter_ratio must be within [0.0, 1.0]",
));
}
let adaptive_retry_mode = match adaptive_retry_mode
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
None => core_metrics::OtlpAdaptiveRetryMode::Linear,
Some(value) if value.eq_ignore_ascii_case("linear") => {
core_metrics::OtlpAdaptiveRetryMode::Linear
}
Some(value) if value.eq_ignore_ascii_case("ewma") => core_metrics::OtlpAdaptiveRetryMode::Ewma,
Some(_) => {
return Err(PyRuntimeError::new_err(
"adaptive_retry_mode must be one of: linear, ewma",
));
}
};
if !(0.0..=1.0).contains(&adaptive_retry_ewma_alpha) {
return Err(PyRuntimeError::new_err(
"adaptive_retry_ewma_alpha must be within [0.0, 1.0]",
));
}
if circuit_breaker_failure_threshold < 0 {
return Err(PyRuntimeError::new_err(
"circuit_breaker_failure_threshold must be non-negative",
));
}
if circuit_breaker_open_ms < 0 {
return Err(PyRuntimeError::new_err(
"circuit_breaker_open_ms must be non-negative",
));
}
if circuit_breaker_failure_threshold > 0 && circuit_breaker_open_ms == 0 {
return Err(PyRuntimeError::new_err(
"circuit_breaker_open_ms must be > 0 when circuit_breaker_failure_threshold is enabled",
));
}
if circuit_breaker_half_open_probes < 0 {
return Err(PyRuntimeError::new_err(
"circuit_breaker_half_open_probes must be non-negative",
));
}
if circuit_breaker_failure_threshold > 0 && circuit_breaker_half_open_probes == 0 {
return Err(PyRuntimeError::new_err(
"circuit_breaker_half_open_probes must be > 0 when circuit_breaker_failure_threshold is enabled",
));
}
if let Some(path) = circuit_breaker_state_path.as_deref() {
if path.trim().is_empty() {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_path must not be empty when provided",
));
}
}
if let Some(url) = circuit_breaker_state_url.as_deref() {
let trimmed = url.trim();
if trimmed.is_empty() {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_url must not be empty when provided",
));
}
if !(trimmed.starts_with("http://") || trimmed.starts_with("https://")) {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_url must use http:// or https://",
));
}
if https_only && trimmed.starts_with("http://") {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_url must use https when https_only is enabled",
));
}
}
if circuit_breaker_state_path.is_some() && circuit_breaker_state_url.is_some() {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_path and circuit_breaker_state_url are mutually exclusive",
));
}
if circuit_breaker_state_patch && circuit_breaker_state_url.is_none() {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_patch requires circuit_breaker_state_url",
));
}
if circuit_breaker_state_patch_batch && !circuit_breaker_state_patch {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_patch_batch requires circuit_breaker_state_patch",
));
}
if circuit_breaker_state_patch_merge && !circuit_breaker_state_patch {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_patch_merge requires circuit_breaker_state_patch",
));
}
if circuit_breaker_state_patch_batch_max_keys <= 0 {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_patch_batch_max_keys must be > 0",
));
}
if circuit_breaker_state_patch_merge_max_keys <= 0 {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_patch_merge_max_keys must be > 0",
));
}
if circuit_breaker_state_patch_retry_max_attempts <= 0 {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_patch_retry_max_attempts must be > 0",
));
}
if circuit_breaker_state_cas && circuit_breaker_state_url.is_none() {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_cas requires circuit_breaker_state_url",
));
}
if let Some(lease_id) = circuit_breaker_state_lease_id.as_deref() {
if lease_id.trim().is_empty() {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_lease_id must not be empty when provided",
));
}
if circuit_breaker_state_url.is_none() {
return Err(PyRuntimeError::new_err(
"circuit_breaker_state_lease_id requires circuit_breaker_state_url",
));
}
}
if let Some(scope_key) = circuit_breaker_scope_key.as_deref() {
if scope_key.trim().is_empty() {
return Err(PyRuntimeError::new_err(
"circuit_breaker_scope_key must not be empty when provided",
));
}
}
Ok(core_metrics::OtlpHttpPushOptions {
timeout_ms: timeout_ms as u64,
bearer_token,
retry_max_attempts: retry_max_attempts as u32,
retry_backoff_ms: retry_backoff_ms as u64,
retry_backoff_max_ms: retry_backoff_max_ms as u64,
retry_jitter_ratio,
adaptive_retry_mode,
adaptive_retry_ewma_alpha,
adaptive_retry,
circuit_breaker_failure_threshold: circuit_breaker_failure_threshold as u32,
circuit_breaker_open_ms: circuit_breaker_open_ms as u64,
circuit_breaker_half_open_probes: circuit_breaker_half_open_probes as u32,
circuit_breaker_state_path,
circuit_breaker_state_url,
circuit_breaker_state_patch,
circuit_breaker_state_patch_batch,
circuit_breaker_state_patch_batch_max_keys: circuit_breaker_state_patch_batch_max_keys as u32,
circuit_breaker_state_patch_merge,
circuit_breaker_state_patch_merge_max_keys: circuit_breaker_state_patch_merge_max_keys as u32,
circuit_breaker_state_patch_retry_max_attempts: circuit_breaker_state_patch_retry_max_attempts
as u32,
circuit_breaker_state_cas,
circuit_breaker_state_lease_id,
circuit_breaker_scope_key,
compression_gzip,
tls: core_metrics::OtlpHttpTlsOptions {
https_only,
ca_cert_pem_path,
client_cert_pem_path,
client_key_pem_path,
},
})
}
#[pyfunction]
#[pyo3(signature = (
db,
endpoint,
timeout_ms=5000,
bearer_token=None,
retry_max_attempts=1,
retry_backoff_ms=100,
retry_backoff_max_ms=2000,
retry_jitter_ratio=0.0,
adaptive_retry=false,
adaptive_retry_mode=None,
adaptive_retry_ewma_alpha=0.3,
circuit_breaker_failure_threshold=0,
circuit_breaker_open_ms=0,
circuit_breaker_half_open_probes=1,
circuit_breaker_state_path=None,
circuit_breaker_state_url=None,
circuit_breaker_state_patch=false,
circuit_breaker_state_patch_batch=false,
circuit_breaker_state_patch_batch_max_keys=8,
circuit_breaker_state_patch_merge=false,
circuit_breaker_state_patch_merge_max_keys=32,
circuit_breaker_state_patch_retry_max_attempts=1,
circuit_breaker_state_cas=false,
circuit_breaker_state_lease_id=None,
circuit_breaker_scope_key=None,
compression_gzip=false,
https_only=false,
ca_cert_pem_path=None,
client_cert_pem_path=None,
client_key_pem_path=None
))]
pub fn push_replication_metrics_otel_json(
db: &PyDatabase,
endpoint: String,
timeout_ms: i64,
bearer_token: Option<String>,
retry_max_attempts: i64,
retry_backoff_ms: i64,
retry_backoff_max_ms: i64,
retry_jitter_ratio: f64,
adaptive_retry: bool,
adaptive_retry_mode: Option<String>,
adaptive_retry_ewma_alpha: f64,
circuit_breaker_failure_threshold: i64,
circuit_breaker_open_ms: i64,
circuit_breaker_half_open_probes: i64,
circuit_breaker_state_path: Option<String>,
circuit_breaker_state_url: Option<String>,
circuit_breaker_state_patch: bool,
circuit_breaker_state_patch_batch: bool,
circuit_breaker_state_patch_batch_max_keys: i64,
circuit_breaker_state_patch_merge: bool,
circuit_breaker_state_patch_merge_max_keys: i64,
circuit_breaker_state_patch_retry_max_attempts: i64,
circuit_breaker_state_cas: bool,
circuit_breaker_state_lease_id: Option<String>,
circuit_breaker_scope_key: Option<String>,
compression_gzip: bool,
https_only: bool,
ca_cert_pem_path: Option<String>,
client_cert_pem_path: Option<String>,
client_key_pem_path: Option<String>,
) -> PyResult<(i64, String)> {
let options = build_otel_push_options_py(
timeout_ms,
bearer_token,
retry_max_attempts,
retry_backoff_ms,
retry_backoff_max_ms,
retry_jitter_ratio,
adaptive_retry,
adaptive_retry_mode,
adaptive_retry_ewma_alpha,
circuit_breaker_failure_threshold,
circuit_breaker_open_ms,
circuit_breaker_half_open_probes,
circuit_breaker_state_path,
circuit_breaker_state_url,
circuit_breaker_state_patch,
circuit_breaker_state_patch_batch,
circuit_breaker_state_patch_batch_max_keys,
circuit_breaker_state_patch_merge,
circuit_breaker_state_patch_merge_max_keys,
circuit_breaker_state_patch_retry_max_attempts,
circuit_breaker_state_cas,
circuit_breaker_state_lease_id,
circuit_breaker_scope_key,
compression_gzip,
https_only,
ca_cert_pem_path,
client_cert_pem_path,
client_key_pem_path,
)?;
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => {
let result = core_metrics::push_replication_metrics_otel_json_single_file_with_options(
d, &endpoint, &options,
)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to push replication metrics: {e}")))?;
Ok((result.status_code, result.response_body))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
#[pyo3(signature = (
db,
endpoint,
timeout_ms=5000,
bearer_token=None,
retry_max_attempts=1,
retry_backoff_ms=100,
retry_backoff_max_ms=2000,
retry_jitter_ratio=0.0,
adaptive_retry=false,
adaptive_retry_mode=None,
adaptive_retry_ewma_alpha=0.3,
circuit_breaker_failure_threshold=0,
circuit_breaker_open_ms=0,
circuit_breaker_half_open_probes=1,
circuit_breaker_state_path=None,
circuit_breaker_state_url=None,
circuit_breaker_state_patch=false,
circuit_breaker_state_patch_batch=false,
circuit_breaker_state_patch_batch_max_keys=8,
circuit_breaker_state_patch_merge=false,
circuit_breaker_state_patch_merge_max_keys=32,
circuit_breaker_state_patch_retry_max_attempts=1,
circuit_breaker_state_cas=false,
circuit_breaker_state_lease_id=None,
circuit_breaker_scope_key=None,
compression_gzip=false,
https_only=false,
ca_cert_pem_path=None,
client_cert_pem_path=None,
client_key_pem_path=None
))]
pub fn push_replication_metrics_otel_protobuf(
db: &PyDatabase,
endpoint: String,
timeout_ms: i64,
bearer_token: Option<String>,
retry_max_attempts: i64,
retry_backoff_ms: i64,
retry_backoff_max_ms: i64,
retry_jitter_ratio: f64,
adaptive_retry: bool,
adaptive_retry_mode: Option<String>,
adaptive_retry_ewma_alpha: f64,
circuit_breaker_failure_threshold: i64,
circuit_breaker_open_ms: i64,
circuit_breaker_half_open_probes: i64,
circuit_breaker_state_path: Option<String>,
circuit_breaker_state_url: Option<String>,
circuit_breaker_state_patch: bool,
circuit_breaker_state_patch_batch: bool,
circuit_breaker_state_patch_batch_max_keys: i64,
circuit_breaker_state_patch_merge: bool,
circuit_breaker_state_patch_merge_max_keys: i64,
circuit_breaker_state_patch_retry_max_attempts: i64,
circuit_breaker_state_cas: bool,
circuit_breaker_state_lease_id: Option<String>,
circuit_breaker_scope_key: Option<String>,
compression_gzip: bool,
https_only: bool,
ca_cert_pem_path: Option<String>,
client_cert_pem_path: Option<String>,
client_key_pem_path: Option<String>,
) -> PyResult<(i64, String)> {
let options = build_otel_push_options_py(
timeout_ms,
bearer_token,
retry_max_attempts,
retry_backoff_ms,
retry_backoff_max_ms,
retry_jitter_ratio,
adaptive_retry,
adaptive_retry_mode,
adaptive_retry_ewma_alpha,
circuit_breaker_failure_threshold,
circuit_breaker_open_ms,
circuit_breaker_half_open_probes,
circuit_breaker_state_path,
circuit_breaker_state_url,
circuit_breaker_state_patch,
circuit_breaker_state_patch_batch,
circuit_breaker_state_patch_batch_max_keys,
circuit_breaker_state_patch_merge,
circuit_breaker_state_patch_merge_max_keys,
circuit_breaker_state_patch_retry_max_attempts,
circuit_breaker_state_cas,
circuit_breaker_state_lease_id,
circuit_breaker_scope_key,
compression_gzip,
https_only,
ca_cert_pem_path,
client_cert_pem_path,
client_key_pem_path,
)?;
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => {
let result = core_metrics::push_replication_metrics_otel_protobuf_single_file_with_options(
d, &endpoint, &options,
)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to push replication metrics: {e}")))?;
Ok((result.status_code, result.response_body))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
#[pyo3(signature = (
db,
endpoint,
timeout_ms=5000,
bearer_token=None,
retry_max_attempts=1,
retry_backoff_ms=100,
retry_backoff_max_ms=2000,
retry_jitter_ratio=0.0,
adaptive_retry=false,
adaptive_retry_mode=None,
adaptive_retry_ewma_alpha=0.3,
circuit_breaker_failure_threshold=0,
circuit_breaker_open_ms=0,
circuit_breaker_half_open_probes=1,
circuit_breaker_state_path=None,
circuit_breaker_state_url=None,
circuit_breaker_state_patch=false,
circuit_breaker_state_patch_batch=false,
circuit_breaker_state_patch_batch_max_keys=8,
circuit_breaker_state_patch_merge=false,
circuit_breaker_state_patch_merge_max_keys=32,
circuit_breaker_state_patch_retry_max_attempts=1,
circuit_breaker_state_cas=false,
circuit_breaker_state_lease_id=None,
circuit_breaker_scope_key=None,
compression_gzip=false,
https_only=false,
ca_cert_pem_path=None,
client_cert_pem_path=None,
client_key_pem_path=None
))]
pub fn push_replication_metrics_otel_grpc(
db: &PyDatabase,
endpoint: String,
timeout_ms: i64,
bearer_token: Option<String>,
retry_max_attempts: i64,
retry_backoff_ms: i64,
retry_backoff_max_ms: i64,
retry_jitter_ratio: f64,
adaptive_retry: bool,
adaptive_retry_mode: Option<String>,
adaptive_retry_ewma_alpha: f64,
circuit_breaker_failure_threshold: i64,
circuit_breaker_open_ms: i64,
circuit_breaker_half_open_probes: i64,
circuit_breaker_state_path: Option<String>,
circuit_breaker_state_url: Option<String>,
circuit_breaker_state_patch: bool,
circuit_breaker_state_patch_batch: bool,
circuit_breaker_state_patch_batch_max_keys: i64,
circuit_breaker_state_patch_merge: bool,
circuit_breaker_state_patch_merge_max_keys: i64,
circuit_breaker_state_patch_retry_max_attempts: i64,
circuit_breaker_state_cas: bool,
circuit_breaker_state_lease_id: Option<String>,
circuit_breaker_scope_key: Option<String>,
compression_gzip: bool,
https_only: bool,
ca_cert_pem_path: Option<String>,
client_cert_pem_path: Option<String>,
client_key_pem_path: Option<String>,
) -> PyResult<(i64, String)> {
let options = build_otel_push_options_py(
timeout_ms,
bearer_token,
retry_max_attempts,
retry_backoff_ms,
retry_backoff_max_ms,
retry_jitter_ratio,
adaptive_retry,
adaptive_retry_mode,
adaptive_retry_ewma_alpha,
circuit_breaker_failure_threshold,
circuit_breaker_open_ms,
circuit_breaker_half_open_probes,
circuit_breaker_state_path,
circuit_breaker_state_url,
circuit_breaker_state_patch,
circuit_breaker_state_patch_batch,
circuit_breaker_state_patch_batch_max_keys,
circuit_breaker_state_patch_merge,
circuit_breaker_state_patch_merge_max_keys,
circuit_breaker_state_patch_retry_max_attempts,
circuit_breaker_state_cas,
circuit_breaker_state_lease_id,
circuit_breaker_scope_key,
compression_gzip,
https_only,
ca_cert_pem_path,
client_cert_pem_path,
client_key_pem_path,
)?;
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => {
let result = core_metrics::push_replication_metrics_otel_grpc_single_file_with_options(
d, &endpoint, &options,
)
.map_err(|e| PyRuntimeError::new_err(format!("Failed to push replication metrics: {e}")))?;
Ok((result.status_code, result.response_body))
}
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
pub fn health_check(db: &PyDatabase) -> PyResult<HealthCheckResult> {
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => Ok(HealthCheckResult::from(
core_metrics::health_check_single_file(d),
)),
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
#[pyo3(signature = (db, backup_path, options=None))]
pub fn create_backup(
db: &PyDatabase,
backup_path: String,
options: Option<BackupOptions>,
) -> PyResult<BackupResult> {
let opts: core_backup::BackupOptions = options.unwrap_or_default().into();
let path = PathBuf::from(backup_path);
let guard = db
.inner
.read()
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
match guard.as_ref() {
Some(DatabaseInner::SingleFile(d)) => core_backup::create_backup_single_file(d, &path, opts)
.map(BackupResult::from)
.map_err(|e| PyRuntimeError::new_err(e.to_string())),
None => Err(PyRuntimeError::new_err("Database is closed")),
}
}
#[pyfunction]
#[pyo3(signature = (backup_path, restore_path, options=None))]
pub fn restore_backup(
backup_path: String,
restore_path: String,
options: Option<RestoreOptions>,
) -> PyResult<String> {
let opts: core_backup::RestoreOptions = options.unwrap_or_default().into();
core_backup::restore_backup(backup_path, restore_path, opts)
.map(|p| p.to_string_lossy().to_string())
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
#[pyfunction]
pub fn backup_info(backup_path: String) -> PyResult<BackupResult> {
core_backup::backup_info(backup_path)
.map(BackupResult::from)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
#[pyfunction]
#[pyo3(signature = (db_path, backup_path, options=None))]
pub fn create_offline_backup(
db_path: String,
backup_path: String,
options: Option<OfflineBackupOptions>,
) -> PyResult<BackupResult> {
let opts: core_backup::OfflineBackupOptions = options.unwrap_or_default().into();
core_backup::create_offline_backup(db_path, backup_path, opts)
.map(BackupResult::from)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))
}
impl From<crate::api::pathfinding::PathResult> for PyPathResult {
fn from(r: crate::api::pathfinding::PathResult) -> Self {
Self {
path: r.path.iter().map(|&id| id as i64).collect(),
edges: r
.edges
.iter()
.map(|&(s, e, d)| PyPathEdge {
src: s as i64,
etype: e,
dst: d as i64,
})
.collect(),
total_weight: r.total_weight,
found: r.found,
}
}
}