use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use crate::storage::schema::Value;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct EntityId(pub u64);
impl EntityId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub fn raw(&self) -> u64 {
self.0
}
}
impl fmt::Display for EntityId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "e{}", self.0)
}
}
impl From<u64> for EntityId {
fn from(id: u64) -> Self {
Self(id)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum EntityKind {
TableRow { table: Arc<str>, row_id: u64 },
GraphNode(Box<GraphNodeKind>),
GraphEdge(Box<GraphEdgeKind>),
Vector { collection: String },
TimeSeriesPoint(Box<TimeSeriesPointKind>),
QueueMessage { queue: String, position: u64 },
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GraphNodeKind {
pub label: String,
pub node_type: String,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct GraphEdgeKind {
pub label: String,
pub from_node: String,
pub to_node: String,
pub weight: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TimeSeriesPointKind {
pub series: String,
pub metric: String,
}
impl EntityKind {
pub fn storage_type(&self) -> &'static str {
match self {
Self::TableRow { .. } => "table",
Self::GraphNode(_) => "graph_node",
Self::GraphEdge(_) => "graph_edge",
Self::Vector { .. } => "vector",
Self::TimeSeriesPoint(_) => "timeseries",
Self::QueueMessage { .. } => "queue",
}
}
pub fn collection(&self) -> &str {
match self {
Self::TableRow { table, .. } => table,
Self::GraphNode(n) => &n.label,
Self::GraphEdge(e) => &e.label,
Self::Vector { collection } => collection,
Self::TimeSeriesPoint(ts) => &ts.series,
Self::QueueMessage { queue, .. } => queue,
}
}
}
#[derive(Debug, Clone)]
pub enum EntityData {
Row(RowData),
Node(NodeData),
Edge(EdgeData),
Vector(VectorData),
TimeSeries(TimeSeriesData),
QueueMessage(QueueMessageData),
}
impl EntityData {
pub fn is_row(&self) -> bool {
matches!(self, Self::Row(_))
}
pub fn is_node(&self) -> bool {
matches!(self, Self::Node(_))
}
pub fn is_edge(&self) -> bool {
matches!(self, Self::Edge(_))
}
pub fn is_vector(&self) -> bool {
matches!(self, Self::Vector(_))
}
pub fn as_row(&self) -> Option<&RowData> {
match self {
Self::Row(r) => Some(r),
_ => None,
}
}
pub fn as_node(&self) -> Option<&NodeData> {
match self {
Self::Node(n) => Some(n),
_ => None,
}
}
pub fn as_edge(&self) -> Option<&EdgeData> {
match self {
Self::Edge(e) => Some(e),
_ => None,
}
}
pub fn as_vector(&self) -> Option<&VectorData> {
match self {
Self::Vector(v) => Some(v),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct RowData {
pub columns: Vec<Value>,
pub named: Option<HashMap<String, Value>>,
pub schema: Option<std::sync::Arc<Vec<String>>>,
}
impl RowData {
pub fn new(columns: Vec<Value>) -> Self {
Self {
columns,
named: None,
schema: None,
}
}
pub fn with_names(columns: Vec<Value>, names: Vec<String>) -> Self {
let named: HashMap<String, Value> =
names.into_iter().zip(columns.iter().cloned()).collect();
Self {
columns,
named: Some(named),
schema: None,
}
}
pub fn get_field(&self, name: &str) -> Option<&Value> {
if let Some(ref named) = self.named {
return named.get(name);
}
if let Some(ref schema) = self.schema {
if let Some(idx) = schema.iter().position(|s| s == name) {
return self.columns.get(idx);
}
}
None
}
pub fn iter_fields(&self) -> Box<dyn Iterator<Item = (&str, &Value)> + '_> {
if let Some(ref named) = self.named {
Box::new(named.iter().map(|(k, v)| (k.as_str(), v)))
} else if let Some(ref schema) = self.schema {
Box::new(
schema
.iter()
.zip(self.columns.iter())
.map(|(k, v)| (k.as_str(), v)),
)
} else {
Box::new(std::iter::empty())
}
}
pub fn get(&self, index: usize) -> Option<&Value> {
self.columns.get(index)
}
pub fn get_by_name(&self, name: &str) -> Option<&Value> {
self.named.as_ref()?.get(name)
}
pub fn len(&self) -> usize {
self.columns.len()
}
pub fn is_empty(&self) -> bool {
self.columns.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct NodeData {
pub properties: HashMap<String, Value>,
}
impl NodeData {
pub fn new() -> Self {
Self {
properties: HashMap::new(),
}
}
pub fn with_properties(properties: HashMap<String, Value>) -> Self {
Self { properties }
}
pub fn set(&mut self, key: impl Into<String>, value: Value) {
self.properties.insert(key.into(), value);
}
pub fn get(&self, key: &str) -> Option<&Value> {
self.properties.get(key)
}
pub fn has(&self, key: &str) -> bool {
self.properties.contains_key(key)
}
}
impl Default for NodeData {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct EdgeData {
pub properties: HashMap<String, Value>,
pub weight: f32,
}
impl EdgeData {
pub fn new(weight: f32) -> Self {
Self {
properties: HashMap::new(),
weight,
}
}
pub fn with_properties(weight: f32, properties: HashMap<String, Value>) -> Self {
Self { properties, weight }
}
pub fn set(&mut self, key: impl Into<String>, value: Value) {
self.properties.insert(key.into(), value);
}
pub fn get(&self, key: &str) -> Option<&Value> {
self.properties.get(key)
}
}
impl Default for EdgeData {
fn default() -> Self {
Self::new(1.0)
}
}
#[derive(Debug, Clone)]
pub struct VectorData {
pub dense: Vec<f32>,
pub sparse: Option<SparseVector>,
pub content: Option<String>,
}
impl VectorData {
pub fn new(dense: Vec<f32>) -> Self {
Self {
dense,
sparse: None,
content: None,
}
}
pub fn with_sparse(dense: Vec<f32>, sparse: SparseVector) -> Self {
Self {
dense,
sparse: Some(sparse),
content: None,
}
}
pub fn with_content(mut self, content: impl Into<String>) -> Self {
self.content = Some(content.into());
self
}
pub fn dimension(&self) -> usize {
self.dense.len()
}
pub fn is_hybrid(&self) -> bool {
self.sparse.is_some()
}
}
#[derive(Debug, Clone)]
pub struct TimeSeriesData {
pub metric: String,
pub timestamp_ns: u64,
pub value: f64,
pub tags: std::collections::HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub struct QueueMessageData {
pub payload: Value,
pub priority: Option<i32>,
pub enqueued_at_ns: u64,
pub attempts: u32,
pub max_attempts: u32,
pub acked: bool,
}
#[derive(Debug, Clone)]
pub struct SparseVector {
pub indices: Vec<u32>,
pub values: Vec<f32>,
pub dimension: usize,
}
impl SparseVector {
pub fn new(indices: Vec<u32>, values: Vec<f32>, dimension: usize) -> Self {
debug_assert_eq!(indices.len(), values.len());
Self {
indices,
values,
dimension,
}
}
pub fn nnz(&self) -> usize {
self.indices.len()
}
pub fn sparsity(&self) -> f32 {
if self.dimension == 0 {
1.0
} else {
1.0 - (self.nnz() as f32 / self.dimension as f32)
}
}
pub fn get(&self, index: u32) -> f32 {
self.indices
.iter()
.position(|&i| i == index)
.map(|pos| self.values[pos])
.unwrap_or(0.0)
}
}
#[derive(Debug, Clone)]
pub struct EmbeddingSlot {
pub name: String,
pub vector: Vec<f32>,
pub model: String,
pub dimension: usize,
pub generated_at: u64,
}
fn current_unix_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
impl EmbeddingSlot {
pub fn new(name: impl Into<String>, vector: Vec<f32>, model: impl Into<String>) -> Self {
let dimension = vector.len();
Self {
name: name.into(),
vector,
model: model.into(),
dimension,
generated_at: current_unix_secs(),
}
}
}
#[derive(Debug, Clone)]
pub struct UnifiedEntity {
pub id: EntityId,
pub kind: EntityKind,
pub created_at: u64,
pub updated_at: u64,
pub data: EntityData,
pub sequence_id: u64,
pub field_bloom: u64,
pub xmin: u64,
pub xmax: u64,
aux: Option<Box<EntityAux>>,
}
#[derive(Debug, Clone, Default)]
pub struct EntityAux {
pub embeddings: Vec<EmbeddingSlot>,
pub cross_refs: Vec<CrossRef>,
}
impl UnifiedEntity {
pub fn embeddings(&self) -> &[EmbeddingSlot] {
self.aux
.as_ref()
.map(|a| a.embeddings.as_slice())
.unwrap_or(&[])
}
pub fn cross_refs(&self) -> &[CrossRef] {
self.aux
.as_ref()
.map(|a| a.cross_refs.as_slice())
.unwrap_or(&[])
}
pub fn embeddings_mut(&mut self) -> &mut Vec<EmbeddingSlot> {
&mut self.aux.get_or_insert_with(Default::default).embeddings
}
pub fn cross_refs_mut(&mut self) -> &mut Vec<CrossRef> {
&mut self.aux.get_or_insert_with(Default::default).cross_refs
}
pub fn has_aux(&self) -> bool {
self.aux.is_some()
}
}
#[inline]
pub fn field_name_bloom(name: &str) -> u64 {
let b = name.as_bytes();
if b.is_empty() {
return 0;
}
1u64 << (b[b.len() / 2] & 63)
}
pub fn compute_entity_field_bloom(data: &EntityData) -> u64 {
match data {
EntityData::Row(row) => {
if row.schema.is_some() {
return 0;
}
if let Some(named) = &row.named {
named.keys().fold(0u64, |acc, k| acc | field_name_bloom(k))
} else {
0
}
}
EntityData::Node(node) => node
.properties
.keys()
.fold(0u64, |acc, k| acc | field_name_bloom(k)),
EntityData::Edge(edge) => edge
.properties
.keys()
.fold(0u64, |acc, k| acc | field_name_bloom(k)),
_ => 0,
}
}
impl UnifiedEntity {
pub fn new(id: EntityId, kind: EntityKind, data: EntityData) -> Self {
let now = current_unix_secs();
let field_bloom = compute_entity_field_bloom(&data);
Self {
id,
kind,
created_at: now,
updated_at: now,
data,
sequence_id: 0,
field_bloom,
xmin: 0,
xmax: 0,
aux: None,
}
}
#[inline]
pub fn is_visible(&self, snapshot_xid: u64) -> bool {
if self.xmin != 0 && self.xmin > snapshot_xid {
return false;
}
if self.xmax != 0 && self.xmax <= snapshot_xid {
return false;
}
true
}
#[inline]
pub fn set_xmin(&mut self, xid: u64) {
self.xmin = xid;
}
#[inline]
pub fn set_xmax(&mut self, xid: u64) {
self.xmax = xid;
}
pub fn table_row(
id: EntityId,
table: impl Into<Arc<str>>,
row_id: u64,
columns: Vec<Value>,
) -> Self {
Self::new(
id,
EntityKind::TableRow {
table: table.into(),
row_id,
},
EntityData::Row(RowData::new(columns)),
)
}
pub fn graph_node(
id: EntityId,
label: impl Into<String>,
node_type: impl Into<String>,
properties: HashMap<String, Value>,
) -> Self {
Self::new(
id,
EntityKind::GraphNode(Box::new(GraphNodeKind {
label: label.into(),
node_type: node_type.into(),
})),
EntityData::Node(NodeData::with_properties(properties)),
)
}
pub fn graph_edge(
id: EntityId,
label: impl Into<String>,
from: impl Into<String>,
to: impl Into<String>,
weight: f32,
properties: HashMap<String, Value>,
) -> Self {
Self::new(
id,
EntityKind::GraphEdge(Box::new(GraphEdgeKind {
label: label.into(),
from_node: from.into(),
to_node: to.into(),
weight: (weight * 1000.0) as u32,
})),
EntityData::Edge(EdgeData::with_properties(weight, properties)),
)
}
pub fn vector(id: EntityId, collection: impl Into<String>, vector: Vec<f32>) -> Self {
Self::new(
id,
EntityKind::Vector {
collection: collection.into(),
},
EntityData::Vector(VectorData::new(vector)),
)
}
pub fn add_embedding(&mut self, slot: EmbeddingSlot) {
self.embeddings_mut().push(slot);
self.touch();
}
pub fn add_cross_ref(&mut self, cross_ref: CrossRef) {
self.cross_refs_mut().push(cross_ref);
self.touch();
}
pub fn get_embedding(&self, name: &str) -> Option<&EmbeddingSlot> {
self.embeddings().iter().find(|e| e.name == name)
}
fn touch(&mut self) {
self.updated_at = current_unix_secs();
}
pub fn is_stale(&self, max_age_secs: u64) -> bool {
let now = current_unix_secs();
now.saturating_sub(self.updated_at) > max_age_secs
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct CrossRef {
pub source: EntityId,
pub target: EntityId,
pub target_collection: String,
pub ref_type: RefType,
pub weight: f32,
pub created_at: u64,
}
impl CrossRef {
pub fn new(
source: EntityId,
target: EntityId,
target_collection: impl Into<String>,
ref_type: RefType,
) -> Self {
Self {
source,
target,
target_collection: target_collection.into(),
ref_type,
weight: 1.0,
created_at: current_unix_secs(),
}
}
pub fn with_weight(
source: EntityId,
target: EntityId,
target_collection: impl Into<String>,
ref_type: RefType,
weight: f32,
) -> Self {
let mut cr = Self::new(source, target, target_collection, ref_type);
cr.weight = weight;
cr
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RefType {
RowToNode, RowToEdge, NodeToRow,
RowToVector, VectorToRow,
NodeToVector, EdgeToVector, VectorToNode,
SimilarTo, RelatedTo, DerivesFrom, Mentions, Contains, DependsOn, }
impl RefType {
pub fn inverse(&self) -> Option<Self> {
match self {
Self::RowToNode => Some(Self::NodeToRow),
Self::NodeToRow => Some(Self::RowToNode),
Self::RowToVector => Some(Self::VectorToRow),
Self::VectorToRow => Some(Self::RowToVector),
Self::NodeToVector => Some(Self::VectorToNode),
Self::VectorToNode => Some(Self::NodeToVector),
Self::SimilarTo => Some(Self::SimilarTo), Self::RelatedTo => Some(Self::RelatedTo), _ => None, }
}
pub fn is_symmetric(&self) -> bool {
matches!(self, Self::SimilarTo | Self::RelatedTo)
}
pub fn to_byte(&self) -> u8 {
match self {
Self::RowToNode => 0,
Self::RowToEdge => 1,
Self::NodeToRow => 2,
Self::RowToVector => 3,
Self::VectorToRow => 4,
Self::NodeToVector => 5,
Self::EdgeToVector => 6,
Self::VectorToNode => 7,
Self::SimilarTo => 8,
Self::RelatedTo => 9,
Self::DerivesFrom => 10,
Self::Mentions => 11,
Self::Contains => 12,
Self::DependsOn => 13,
}
}
pub fn from_byte(byte: u8) -> Self {
match byte {
0 => Self::RowToNode,
1 => Self::RowToEdge,
2 => Self::NodeToRow,
3 => Self::RowToVector,
4 => Self::VectorToRow,
5 => Self::NodeToVector,
6 => Self::EdgeToVector,
7 => Self::VectorToNode,
8 => Self::SimilarTo,
9 => Self::RelatedTo,
10 => Self::DerivesFrom,
11 => Self::Mentions,
12 => Self::Contains,
13 => Self::DependsOn,
_ => Self::RelatedTo, }
}
}
impl From<Vec<Value>> for RowData {
fn from(columns: Vec<Value>) -> Self {
RowData::new(columns)
}
}
impl From<HashMap<String, Value>> for NodeData {
fn from(properties: HashMap<String, Value>) -> Self {
NodeData::with_properties(properties)
}
}
impl From<Vec<f32>> for VectorData {
fn from(dense: Vec<f32>) -> Self {
VectorData::new(dense)
}
}
impl From<(Vec<f32>, SparseVector)> for VectorData {
fn from((dense, sparse): (Vec<f32>, SparseVector)) -> Self {
VectorData::with_sparse(dense, sparse)
}
}
impl UnifiedEntity {
pub fn from_properties(
id: EntityId,
label: impl Into<String>,
node_type: impl Into<String>,
properties: impl IntoIterator<Item = (impl Into<String>, Value)>,
) -> Self {
let props: HashMap<String, Value> =
properties.into_iter().map(|(k, v)| (k.into(), v)).collect();
Self::graph_node(id, label, node_type, props)
}
pub fn into_row(self) -> Option<RowData> {
match self.data {
EntityData::Row(r) => Some(r),
_ => None,
}
}
pub fn into_node(self) -> Option<NodeData> {
match self.data {
EntityData::Node(n) => Some(n),
_ => None,
}
}
pub fn into_edge(self) -> Option<EdgeData> {
match self.data {
EntityData::Edge(e) => Some(e),
_ => None,
}
}
pub fn into_vector(self) -> Option<VectorData> {
match self.data {
EntityData::Vector(v) => Some(v),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_entity_creation() {
let id = EntityId::new(1);
let entity = UnifiedEntity::table_row(
id,
"users",
100,
vec![Value::text("alice".to_string()), Value::Integer(25)],
);
assert!(entity.data.is_row());
assert_eq!(entity.kind.storage_type(), "table");
assert_eq!(entity.kind.collection(), "users");
}
#[test]
fn test_cross_refs() {
let id1 = EntityId::new(1);
let id2 = EntityId::new(2);
let cross_ref = CrossRef::new(id1, id2, "nodes", RefType::RowToNode);
assert_eq!(cross_ref.source, id1);
assert_eq!(cross_ref.target, id2);
assert_eq!(cross_ref.ref_type.inverse(), Some(RefType::NodeToRow));
}
#[test]
fn test_sparse_vector() {
let sparse = SparseVector::new(vec![0, 5, 10], vec![1.0, 2.0, 3.0], 100);
assert_eq!(sparse.nnz(), 3);
assert_eq!(sparse.get(5), 2.0);
assert_eq!(sparse.get(3), 0.0);
assert!(sparse.sparsity() > 0.9);
}
#[test]
fn test_embedding_slots() {
let mut entity = UnifiedEntity::table_row(
EntityId::new(1),
"documents",
1,
vec![Value::text("Hello world".to_string())],
);
entity.add_embedding(EmbeddingSlot::new(
"content",
vec![0.1, 0.2, 0.3],
"text-embedding-3-small",
));
assert_eq!(entity.embeddings().len(), 1);
assert!(entity.get_embedding("content").is_some());
assert!(entity.get_embedding("summary").is_none());
}
}