from __future__ import annotations
from dataclasses import dataclass, field
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
List,
Optional,
Protocol,
Tuple,
TypeVar,
Union,
)
from .schema import EdgeDef, NodeDef, PropDef
if TYPE_CHECKING:
from kitedb._kitedb import Database, PropValue
class NodeRef(Generic[TypeVar("N", bound=NodeDef)]):
__slots__ = ('id', 'key', 'node_def', 'props')
def __init__(
self,
id: int,
key: str,
node_def: NodeDef[Any],
props: Optional[Dict[str, Any]] = None,
):
self.id = id
self.key = key
self.node_def = node_def
self.props = props if props is not None else {}
def __getattr__(self, name: str) -> Any:
props = object.__getattribute__(self, 'props')
if name in props:
return props[name]
node_def = object.__getattribute__(self, 'node_def')
if name in node_def.props:
return None raise AttributeError(f"'{type(self).__name__}' object has no attribute '{name}'")
def __repr__(self) -> str:
props_str = ", ".join(f"{k}={v!r}" for k, v in self.props.items())
return f"NodeRef(id={self.id}, key={self.key!r}, {props_str})"
def __eq__(self, other: object) -> bool:
if isinstance(other, NodeRef):
return self.id == other.id
return False
def __hash__(self) -> int:
return hash(self.id)
N = TypeVar("N", bound=NodeDef)
E = TypeVar("E", bound=EdgeDef)
def to_prop_value(prop_def: PropDef[Any], value: Any, PropValue: type) -> PropValue:
if value is None:
return PropValue.null()
if prop_def.type == "string":
return PropValue.string(str(value))
elif prop_def.type == "int":
return PropValue.int(int(value))
elif prop_def.type == "float":
return PropValue.float(float(value))
elif prop_def.type == "bool":
return PropValue.bool(bool(value))
elif prop_def.type == "vector":
return PropValue.vector([float(v) for v in value])
else:
raise ValueError(f"Unknown property type: {prop_def.type}")
def from_prop_value(pv: PropValue) -> Any:
return pv.value()
class InsertExecutor(Generic[N]):
def __init__(
self,
db: Database,
node_def: N,
data: Union[Dict[str, Any], List[Dict[str, Any]]],
resolve_prop_key_id: Callable[[NodeDef, str], int],
use_batch: bool = False,
):
self._db = db
self._node_def = node_def
self._data = data if isinstance(data, list) else [data]
self._is_single = not isinstance(data, list)
self._resolve_prop_key_id = resolve_prop_key_id
self._use_batch = use_batch
def returning(self) -> Union[NodeRef[N], List[NodeRef[N]]]:
from kitedb._kitedb import PropValue
if self._use_batch and len(self._data) > 1:
return self._returning_batch()
results: List[NodeRef[N]] = []
in_tx = self._db.has_transaction()
if not in_tx:
self._db.begin()
try:
for item in self._data:
key_arg = item.pop("key", None)
if key_arg is None:
raise ValueError("Insert requires a 'key' field")
full_key = self._node_def.key_fn(key_arg)
node_id = self._db.create_node(full_key)
for prop_name, value in item.items():
if value is None:
continue
prop_def = self._node_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = self._resolve_prop_key_id(self._node_def, prop_name)
prop_value = to_prop_value(prop_def, value, PropValue)
self._db.set_node_prop(node_id, prop_key_id, prop_value)
results.append(NodeRef(
id=node_id,
key=full_key,
node_def=self._node_def,
props=item,
))
if not in_tx:
self._db.commit()
except Exception:
if not in_tx:
self._db.rollback()
raise
return results[0] if self._is_single else results
def _returning_batch(self) -> Union[NodeRef[N], List[NodeRef[N]]]:
from kitedb._kitedb import PropValue
batch_nodes = []
items_for_results = []
for item in self._data:
item_copy = dict(item) key_arg = item_copy.pop("key", None)
if key_arg is None:
raise ValueError("Insert requires a 'key' field")
full_key = self._node_def.key_fn(key_arg)
props_list = []
for prop_name, value in item_copy.items():
if value is None:
continue
prop_def = self._node_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = self._resolve_prop_key_id(self._node_def, prop_name)
prop_value = to_prop_value(prop_def, value, PropValue)
props_list.append((prop_key_id, prop_value))
batch_nodes.append((full_key, props_list))
items_for_results.append((full_key, item_copy))
node_ids = self._db.batch_create_nodes(batch_nodes)
results = []
for node_id, (full_key, item) in zip(node_ids, items_for_results):
results.append(NodeRef(
id=node_id,
key=full_key,
node_def=self._node_def,
props=item,
))
return results[0] if self._is_single else results
def execute(self) -> None:
self.returning()
class InsertBuilder(Generic[N]):
def __init__(
self,
db: Database,
node_def: N,
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_def = node_def
self._resolve_prop_key_id = resolve_prop_key_id
def values(
self,
data: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
**kwargs: Any,
) -> InsertExecutor[N]:
if data is None:
data = kwargs
elif isinstance(data, list):
if kwargs:
raise ValueError("Cannot combine list data with keyword arguments")
return InsertExecutor(
db=self._db,
node_def=self._node_def,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
use_batch=True,
)
elif kwargs:
data = {**data, **kwargs}
return InsertExecutor(
db=self._db,
node_def=self._node_def,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
)
def values_many(self, data: List[Dict[str, Any]], *, batch: bool = True) -> InsertExecutor[N]:
return InsertExecutor(
db=self._db,
node_def=self._node_def,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
use_batch=batch,
)
class UpsertExecutor(Generic[N]):
def __init__(
self,
db: Database,
node_def: N,
data: Union[Dict[str, Any], List[Dict[str, Any]]],
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_def = node_def
self._data = data if isinstance(data, list) else [data]
self._is_single = not isinstance(data, list)
self._resolve_prop_key_id = resolve_prop_key_id
def returning(self) -> Union[NodeRef[N], List[NodeRef[N]]]:
from kitedb._kitedb import PropValue
results: List[NodeRef[N]] = []
in_tx = self._db.has_transaction()
if not in_tx:
self._db.begin()
try:
for item in self._data:
item_copy = dict(item)
key_arg = item_copy.pop("key", None)
if key_arg is None:
raise ValueError("Upsert requires a 'key' field")
full_key = self._node_def.key_fn(key_arg)
prop_updates: List[tuple[int, Optional[PropValue]]] = []
for prop_name, value in item_copy.items():
prop_def = self._node_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = self._resolve_prop_key_id(self._node_def, prop_name)
if value is None:
prop_updates.append((prop_key_id, None))
else:
prop_value = to_prop_value(prop_def, value, PropValue)
prop_updates.append((prop_key_id, prop_value))
node_id = self._db.upsert_node(full_key, prop_updates)
props: Dict[str, Any] = {}
all_props = self._db.get_node_props(node_id)
if all_props is not None:
key_id_to_name = {v: k for k, v in self._node_def._prop_key_ids.items()}
for node_prop in all_props:
prop_name = key_id_to_name.get(node_prop.key_id)
if prop_name is not None:
props[prop_name] = from_prop_value(node_prop.value)
results.append(NodeRef(
id=node_id,
key=full_key,
node_def=self._node_def,
props=props,
))
if not in_tx:
self._db.commit()
except Exception:
if not in_tx:
self._db.rollback()
raise
return results[0] if self._is_single else results
def execute(self) -> None:
self.returning()
class UpsertBuilder(Generic[N]):
def __init__(
self,
db: Database,
node_def: N,
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_def = node_def
self._resolve_prop_key_id = resolve_prop_key_id
def values(
self,
data: Optional[Union[Dict[str, Any], List[Dict[str, Any]]]] = None,
**kwargs: Any,
) -> UpsertExecutor[N]:
if data is None:
data = kwargs
elif isinstance(data, list):
if kwargs:
raise ValueError("Cannot combine list data with keyword arguments")
return UpsertExecutor(
db=self._db,
node_def=self._node_def,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
)
elif kwargs:
data = {**data, **kwargs}
return UpsertExecutor(
db=self._db,
node_def=self._node_def,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
)
def values_many(self, data: List[Dict[str, Any]]) -> UpsertExecutor[N]:
return UpsertExecutor(
db=self._db,
node_def=self._node_def,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
)
class UpdateExecutor(Generic[N]):
def __init__(
self,
db: Database,
node_def: N,
data: Dict[str, Any],
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_def = node_def
self._data = data
self._resolve_prop_key_id = resolve_prop_key_id
self._where_id: Optional[int] = None
self._where_key: Optional[str] = None
def where(
self,
*,
id: Optional[int] = None,
key: Optional[str] = None,
) -> UpdateExecutor[N]:
self._where_id = id
self._where_key = key
return self
def execute(self) -> None:
from kitedb._kitedb import PropValue
if self._where_id is None and self._where_key is None:
raise ValueError("Update requires a where condition (id or key)")
node_id: Optional[int] = self._where_id
if node_id is None and self._where_key:
node_id = self._db.get_node_by_key(self._where_key)
if node_id is None:
raise ValueError(f"Node not found: {self._where_key}")
resolved_node_id: int = node_id
in_tx = self._db.has_transaction()
if not in_tx:
self._db.begin()
try:
for prop_name, value in self._data.items():
prop_def = self._node_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = self._resolve_prop_key_id(self._node_def, prop_name)
if value is None:
self._db.delete_node_prop(resolved_node_id, prop_key_id)
else:
prop_value = to_prop_value(prop_def, value, PropValue)
self._db.set_node_prop(resolved_node_id, prop_key_id, prop_value)
if not in_tx:
self._db.commit()
except Exception:
if not in_tx:
self._db.rollback()
raise
class UpdateByRefExecutor:
def __init__(
self,
db: Database,
node_ref: NodeRef[Any],
data: Dict[str, Any],
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_ref = node_ref
self._data = data
self._resolve_prop_key_id = resolve_prop_key_id
def execute(self) -> None:
from kitedb._kitedb import PropValue
in_tx = self._db.has_transaction()
if not in_tx:
self._db.begin()
try:
for prop_name, value in self._data.items():
prop_def = self._node_ref.node_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = self._resolve_prop_key_id(self._node_ref.node_def, prop_name)
if value is None:
self._db.delete_node_prop(self._node_ref.id, prop_key_id)
else:
prop_value = to_prop_value(prop_def, value, PropValue)
self._db.set_node_prop(self._node_ref.id, prop_key_id, prop_value)
if not in_tx:
self._db.commit()
except Exception:
if not in_tx:
self._db.rollback()
raise
class UpdateBuilder(Generic[N]):
def __init__(
self,
db: Database,
node_def: N,
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_def = node_def
self._resolve_prop_key_id = resolve_prop_key_id
def set(self, data: Optional[Dict[str, Any]] = None, **kwargs: Any) -> UpdateExecutor[N]:
if data is None:
data = kwargs
elif kwargs:
data = {**data, **kwargs}
return UpdateExecutor(
db=self._db,
node_def=self._node_def,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
)
class UpdateByRefBuilder:
def __init__(
self,
db: Database,
node_ref: NodeRef[Any],
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_ref = node_ref
self._resolve_prop_key_id = resolve_prop_key_id
def set(self, data: Optional[Dict[str, Any]] = None, **kwargs: Any) -> UpdateByRefExecutor:
if data is None:
data = kwargs
elif kwargs:
data = {**data, **kwargs}
return UpdateByRefExecutor(
db=self._db,
node_ref=self._node_ref,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
)
class UpsertByIdExecutor:
def __init__(
self,
db: Database,
node_def: NodeDef[Any],
node_id: int,
data: Dict[str, Any],
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_def = node_def
self._node_id = node_id
self._data = data
self._resolve_prop_key_id = resolve_prop_key_id
def execute(self) -> None:
from kitedb._kitedb import PropValue
prop_updates: List[Tuple[int, Optional[PropValue]]] = []
for prop_name, value in self._data.items():
prop_def = self._node_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = self._resolve_prop_key_id(self._node_def, prop_name)
if value is None:
prop_updates.append((prop_key_id, None))
else:
prop_value = to_prop_value(prop_def, value, PropValue)
prop_updates.append((prop_key_id, prop_value))
in_tx = self._db.has_transaction()
if not in_tx:
self._db.begin()
try:
self._db.upsert_node_by_id(self._node_id, prop_updates)
if not in_tx:
self._db.commit()
except Exception:
if not in_tx:
self._db.rollback()
raise
class UpsertByIdBuilder(Generic[N]):
def __init__(
self,
db: Database,
node_def: N,
node_id: int,
resolve_prop_key_id: Callable[[NodeDef, str], int],
):
self._db = db
self._node_def = node_def
self._node_id = node_id
self._resolve_prop_key_id = resolve_prop_key_id
def set(self, data: Optional[Dict[str, Any]] = None, **kwargs: Any) -> UpsertByIdExecutor:
if data is None:
data = kwargs
elif kwargs:
data = {**data, **kwargs}
return UpsertByIdExecutor(
db=self._db,
node_def=self._node_def,
node_id=self._node_id,
data=data,
resolve_prop_key_id=self._resolve_prop_key_id,
)
class DeleteExecutor:
def __init__(self, db: Database):
self._db = db
self._where_id: Optional[int] = None
self._where_key: Optional[str] = None
def where(
self,
*,
id: Optional[int] = None,
key: Optional[str] = None,
) -> DeleteExecutor:
self._where_id = id
self._where_key = key
return self
def execute(self) -> bool:
if self._where_id is None and self._where_key is None:
raise ValueError("Delete requires a where condition (id or key)")
node_id: Optional[int] = self._where_id
if node_id is None and self._where_key:
node_id = self._db.get_node_by_key(self._where_key)
if node_id is None:
return False
resolved_node_id: int = node_id
in_tx = self._db.has_transaction()
if not in_tx:
self._db.begin()
try:
self._db.delete_node(resolved_node_id)
if not in_tx:
self._db.commit()
return True
except Exception:
if not in_tx:
self._db.rollback()
raise
class DeleteBuilder(Generic[N]):
def __init__(self, db: Database, node_def: N):
self._db = db
self._node_def = node_def
def where(
self,
*,
id: Optional[int] = None,
key: Optional[str] = None,
) -> DeleteExecutor:
executor = DeleteExecutor(self._db)
return executor.where(id=id, key=key)
def create_link(
db: Database,
src: NodeRef[Any],
edge_def: EdgeDef,
dst: NodeRef[Any],
props: Optional[Dict[str, Any]],
resolve_etype_id: Callable[[EdgeDef], int],
resolve_prop_key_id: Callable[[EdgeDef, str], int],
) -> None:
from kitedb._kitedb import PropValue
etype_id = resolve_etype_id(edge_def)
in_tx = db.has_transaction()
if not in_tx:
db.begin()
try:
db.add_edge(src.id, etype_id, dst.id)
if props:
for prop_name, value in props.items():
if value is None:
continue
prop_def = edge_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = resolve_prop_key_id(edge_def, prop_name)
prop_value = to_prop_value(prop_def, value, PropValue)
db.set_edge_prop(src.id, etype_id, dst.id, prop_key_id, prop_value)
if not in_tx:
db.commit()
except Exception:
if not in_tx:
db.rollback()
raise
def delete_link(
db: Database,
src: NodeRef[Any],
edge_def: EdgeDef,
dst: NodeRef[Any],
resolve_etype_id: Callable[[EdgeDef], int],
) -> None:
etype_id = resolve_etype_id(edge_def)
in_tx = db.has_transaction()
if not in_tx:
db.begin()
try:
db.delete_edge(src.id, etype_id, dst.id)
if not in_tx:
db.commit()
except Exception:
if not in_tx:
db.rollback()
raise
class UpdateEdgeExecutor:
def __init__(
self,
db: Database,
src: NodeRef[Any],
edge_def: EdgeDef,
dst: NodeRef[Any],
data: Dict[str, Any],
resolve_etype_id: Callable[[EdgeDef], int],
resolve_prop_key_id: Callable[[EdgeDef, str], int],
):
self._db = db
self._src = src
self._edge_def = edge_def
self._dst = dst
self._data = data
self._resolve_etype_id = resolve_etype_id
self._resolve_prop_key_id = resolve_prop_key_id
def execute(self) -> None:
from kitedb._kitedb import PropValue
etype_id = self._resolve_etype_id(self._edge_def)
in_tx = self._db.has_transaction()
if not in_tx:
self._db.begin()
try:
for prop_name, value in self._data.items():
prop_def = self._edge_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = self._resolve_prop_key_id(self._edge_def, prop_name)
if value is None:
self._db.delete_edge_prop(
self._src.id, etype_id, self._dst.id, prop_key_id
)
else:
prop_value = to_prop_value(prop_def, value, PropValue)
self._db.set_edge_prop(
self._src.id, etype_id, self._dst.id, prop_key_id, prop_value
)
if not in_tx:
self._db.commit()
except Exception:
if not in_tx:
self._db.rollback()
raise
class UpdateEdgeBuilder(Generic[E]):
def __init__(
self,
db: Database,
src: NodeRef[Any],
edge_def: E,
dst: NodeRef[Any],
resolve_etype_id: Callable[[EdgeDef], int],
resolve_prop_key_id: Callable[[EdgeDef, str], int],
):
self._db = db
self._src = src
self._edge_def = edge_def
self._dst = dst
self._resolve_etype_id = resolve_etype_id
self._resolve_prop_key_id = resolve_prop_key_id
def set(self, data: Optional[Dict[str, Any]] = None, **kwargs: Any) -> UpdateEdgeExecutor:
if data is None:
data = kwargs
else:
data = {**data, **kwargs}
return UpdateEdgeExecutor(
db=self._db,
src=self._src,
edge_def=self._edge_def,
dst=self._dst,
data=data,
resolve_etype_id=self._resolve_etype_id,
resolve_prop_key_id=self._resolve_prop_key_id,
)
class UpsertEdgeExecutor:
def __init__(
self,
db: Database,
src: NodeRef[Any],
edge_def: EdgeDef,
dst: NodeRef[Any],
data: Dict[str, Any],
resolve_etype_id: Callable[[EdgeDef], int],
resolve_prop_key_id: Callable[[EdgeDef, str], int],
):
self._db = db
self._src = src
self._edge_def = edge_def
self._dst = dst
self._data = data
self._resolve_etype_id = resolve_etype_id
self._resolve_prop_key_id = resolve_prop_key_id
def execute(self) -> None:
from kitedb._kitedb import PropValue
etype_id = self._resolve_etype_id(self._edge_def)
in_tx = self._db.has_transaction()
if not in_tx:
self._db.begin()
try:
if not self._db.edge_exists(self._src.id, etype_id, self._dst.id):
self._db.add_edge(self._src.id, etype_id, self._dst.id)
for prop_name, value in self._data.items():
prop_def = self._edge_def.props.get(prop_name)
if prop_def is None:
continue
prop_key_id = self._resolve_prop_key_id(self._edge_def, prop_name)
if value is None:
self._db.delete_edge_prop(
self._src.id, etype_id, self._dst.id, prop_key_id
)
else:
prop_value = to_prop_value(prop_def, value, PropValue)
self._db.set_edge_prop(
self._src.id, etype_id, self._dst.id, prop_key_id, prop_value
)
if not in_tx:
self._db.commit()
except Exception:
if not in_tx:
self._db.rollback()
raise
class UpsertEdgeBuilder(Generic[E]):
def __init__(
self,
db: Database,
src: NodeRef[Any],
edge_def: E,
dst: NodeRef[Any],
resolve_etype_id: Callable[[EdgeDef], int],
resolve_prop_key_id: Callable[[EdgeDef, str], int],
):
self._db = db
self._src = src
self._edge_def = edge_def
self._dst = dst
self._resolve_etype_id = resolve_etype_id
self._resolve_prop_key_id = resolve_prop_key_id
def set(self, data: Optional[Dict[str, Any]] = None, **kwargs: Any) -> UpsertEdgeExecutor:
if data is None:
data = kwargs
else:
data = {**data, **kwargs}
return UpsertEdgeExecutor(
db=self._db,
src=self._src,
edge_def=self._edge_def,
dst=self._dst,
data=data,
resolve_etype_id=self._resolve_etype_id,
resolve_prop_key_id=self._resolve_prop_key_id,
)
__all__ = [
"NodeRef",
"InsertBuilder",
"InsertExecutor",
"UpdateBuilder",
"UpdateExecutor",
"UpdateByRefBuilder",
"UpdateByRefExecutor",
"UpsertByIdBuilder",
"UpsertByIdExecutor",
"DeleteBuilder",
"DeleteExecutor",
"UpdateEdgeBuilder",
"UpdateEdgeExecutor",
"UpsertEdgeBuilder",
"UpsertEdgeExecutor",
"create_link",
"delete_link",
"to_prop_value",
"from_prop_value",
]