1use std::collections::HashMap;
7use std::fmt;
8use std::sync::Arc;
9
10use crate::storage::schema::Value;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
14pub struct EntityId(pub u64);
15
16impl EntityId {
17 pub fn new(id: u64) -> Self {
19 Self(id)
20 }
21
22 pub fn raw(&self) -> u64 {
24 self.0
25 }
26}
27
28impl fmt::Display for EntityId {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 write!(f, "e{}", self.0)
31 }
32}
33
34impl From<u64> for EntityId {
35 fn from(id: u64) -> Self {
36 Self(id)
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub enum EntityKind {
43 TableRow { table: Arc<str>, row_id: u64 },
45 GraphNode(Box<GraphNodeKind>),
47 GraphEdge(Box<GraphEdgeKind>),
49 Vector { collection: String },
51 TimeSeriesPoint(Box<TimeSeriesPointKind>),
53 QueueMessage { queue: String, position: u64 },
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct GraphNodeKind {
59 pub label: String,
60 pub node_type: String,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Hash)]
64pub struct GraphEdgeKind {
65 pub label: String,
66 pub from_node: String,
67 pub to_node: String,
68 pub weight: u32,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Hash)]
72pub struct TimeSeriesPointKind {
73 pub series: String,
74 pub metric: String,
75}
76
77impl EntityKind {
78 pub fn storage_type(&self) -> &'static str {
80 match self {
81 Self::TableRow { .. } => "table",
82 Self::GraphNode(_) => "graph_node",
83 Self::GraphEdge(_) => "graph_edge",
84 Self::Vector { .. } => "vector",
85 Self::TimeSeriesPoint(_) => "timeseries",
86 Self::QueueMessage { .. } => "queue",
87 }
88 }
89
90 pub fn collection(&self) -> &str {
92 match self {
93 Self::TableRow { table, .. } => table,
94 Self::GraphNode(n) => &n.label,
95 Self::GraphEdge(e) => &e.label,
96 Self::Vector { collection } => collection,
97 Self::TimeSeriesPoint(ts) => &ts.series,
98 Self::QueueMessage { queue, .. } => queue,
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub enum EntityData {
106 Row(RowData),
108 Node(NodeData),
110 Edge(EdgeData),
112 Vector(VectorData),
114 TimeSeries(TimeSeriesData),
116 QueueMessage(QueueMessageData),
118}
119
120impl EntityData {
121 pub fn is_row(&self) -> bool {
123 matches!(self, Self::Row(_))
124 }
125
126 pub fn is_node(&self) -> bool {
128 matches!(self, Self::Node(_))
129 }
130
131 pub fn is_edge(&self) -> bool {
133 matches!(self, Self::Edge(_))
134 }
135
136 pub fn is_vector(&self) -> bool {
138 matches!(self, Self::Vector(_))
139 }
140
141 pub fn as_row(&self) -> Option<&RowData> {
143 match self {
144 Self::Row(r) => Some(r),
145 _ => None,
146 }
147 }
148
149 pub fn as_node(&self) -> Option<&NodeData> {
151 match self {
152 Self::Node(n) => Some(n),
153 _ => None,
154 }
155 }
156
157 pub fn as_edge(&self) -> Option<&EdgeData> {
159 match self {
160 Self::Edge(e) => Some(e),
161 _ => None,
162 }
163 }
164
165 pub fn as_vector(&self) -> Option<&VectorData> {
167 match self {
168 Self::Vector(v) => Some(v),
169 _ => None,
170 }
171 }
172}
173
174#[derive(Debug, Clone)]
176pub struct RowData {
177 pub columns: Vec<Value>,
179 pub named: Option<HashMap<String, Value>>,
181 pub schema: Option<std::sync::Arc<Vec<String>>>,
185}
186
187impl RowData {
188 pub fn new(columns: Vec<Value>) -> Self {
190 Self {
191 columns,
192 named: None,
193 schema: None,
194 }
195 }
196
197 pub fn with_names(columns: Vec<Value>, names: Vec<String>) -> Self {
199 let named: HashMap<String, Value> =
200 names.into_iter().zip(columns.iter().cloned()).collect();
201 Self {
202 columns,
203 named: Some(named),
204 schema: None,
205 }
206 }
207
208 pub fn get_field(&self, name: &str) -> Option<&Value> {
210 if let Some(ref named) = self.named {
212 return named.get(name);
213 }
214 if let Some(ref schema) = self.schema {
216 if let Some(idx) = schema.iter().position(|s| s == name) {
217 return self.columns.get(idx);
218 }
219 }
220 None
221 }
222
223 pub fn iter_fields(&self) -> Box<dyn Iterator<Item = (&str, &Value)> + '_> {
225 if let Some(ref named) = self.named {
226 Box::new(named.iter().map(|(k, v)| (k.as_str(), v)))
227 } else if let Some(ref schema) = self.schema {
228 Box::new(
229 schema
230 .iter()
231 .zip(self.columns.iter())
232 .map(|(k, v)| (k.as_str(), v)),
233 )
234 } else {
235 Box::new(std::iter::empty())
236 }
237 }
238
239 pub fn get(&self, index: usize) -> Option<&Value> {
241 self.columns.get(index)
242 }
243
244 pub fn get_by_name(&self, name: &str) -> Option<&Value> {
246 self.named.as_ref()?.get(name)
247 }
248
249 pub fn len(&self) -> usize {
251 self.columns.len()
252 }
253
254 pub fn is_empty(&self) -> bool {
256 self.columns.is_empty()
257 }
258}
259
260#[derive(Debug, Clone)]
262pub struct NodeData {
263 pub properties: HashMap<String, Value>,
265}
266
267impl NodeData {
268 pub fn new() -> Self {
270 Self {
271 properties: HashMap::new(),
272 }
273 }
274
275 pub fn with_properties(properties: HashMap<String, Value>) -> Self {
277 Self { properties }
278 }
279
280 pub fn set(&mut self, key: impl Into<String>, value: Value) {
282 self.properties.insert(key.into(), value);
283 }
284
285 pub fn get(&self, key: &str) -> Option<&Value> {
287 self.properties.get(key)
288 }
289
290 pub fn has(&self, key: &str) -> bool {
292 self.properties.contains_key(key)
293 }
294}
295
296impl Default for NodeData {
297 fn default() -> Self {
298 Self::new()
299 }
300}
301
302#[derive(Debug, Clone)]
304pub struct EdgeData {
305 pub properties: HashMap<String, Value>,
307 pub weight: f32,
309}
310
311impl EdgeData {
312 pub fn new(weight: f32) -> Self {
314 Self {
315 properties: HashMap::new(),
316 weight,
317 }
318 }
319
320 pub fn with_properties(weight: f32, properties: HashMap<String, Value>) -> Self {
322 Self { properties, weight }
323 }
324
325 pub fn set(&mut self, key: impl Into<String>, value: Value) {
327 self.properties.insert(key.into(), value);
328 }
329
330 pub fn get(&self, key: &str) -> Option<&Value> {
332 self.properties.get(key)
333 }
334}
335
336impl Default for EdgeData {
337 fn default() -> Self {
338 Self::new(1.0)
339 }
340}
341
342#[derive(Debug, Clone)]
344pub struct VectorData {
345 pub dense: Vec<f32>,
347 pub sparse: Option<SparseVector>,
349 pub content: Option<String>,
351}
352
353impl VectorData {
354 pub fn new(dense: Vec<f32>) -> Self {
356 Self {
357 dense,
358 sparse: None,
359 content: None,
360 }
361 }
362
363 pub fn with_sparse(dense: Vec<f32>, sparse: SparseVector) -> Self {
365 Self {
366 dense,
367 sparse: Some(sparse),
368 content: None,
369 }
370 }
371
372 pub fn with_content(mut self, content: impl Into<String>) -> Self {
374 self.content = Some(content.into());
375 self
376 }
377
378 pub fn dimension(&self) -> usize {
380 self.dense.len()
381 }
382
383 pub fn is_hybrid(&self) -> bool {
385 self.sparse.is_some()
386 }
387}
388
389#[derive(Debug, Clone)]
391pub struct TimeSeriesData {
392 pub metric: String,
394 pub timestamp_ns: u64,
396 pub value: f64,
398 pub tags: std::collections::HashMap<String, String>,
400}
401
402#[derive(Debug, Clone)]
404pub struct QueueMessageData {
405 pub payload: Value,
407 pub priority: Option<i32>,
409 pub enqueued_at_ns: u64,
411 pub attempts: u32,
413 pub max_attempts: u32,
415 pub acked: bool,
417}
418
419#[derive(Debug, Clone)]
421pub struct SparseVector {
422 pub indices: Vec<u32>,
424 pub values: Vec<f32>,
426 pub dimension: usize,
428}
429
430impl SparseVector {
431 pub fn new(indices: Vec<u32>, values: Vec<f32>, dimension: usize) -> Self {
433 debug_assert_eq!(indices.len(), values.len());
434 Self {
435 indices,
436 values,
437 dimension,
438 }
439 }
440
441 pub fn nnz(&self) -> usize {
443 self.indices.len()
444 }
445
446 pub fn sparsity(&self) -> f32 {
448 if self.dimension == 0 {
449 1.0
450 } else {
451 1.0 - (self.nnz() as f32 / self.dimension as f32)
452 }
453 }
454
455 pub fn get(&self, index: u32) -> f32 {
457 self.indices
458 .iter()
459 .position(|&i| i == index)
460 .map(|pos| self.values[pos])
461 .unwrap_or(0.0)
462 }
463}
464
465#[derive(Debug, Clone)]
467pub struct EmbeddingSlot {
468 pub name: String,
470 pub vector: Vec<f32>,
472 pub model: String,
474 pub dimension: usize,
476 pub generated_at: u64,
478}
479
480fn current_unix_secs() -> u64 {
481 std::time::SystemTime::now()
482 .duration_since(std::time::UNIX_EPOCH)
483 .unwrap_or_default()
484 .as_secs()
485}
486
487impl EmbeddingSlot {
488 pub fn new(name: impl Into<String>, vector: Vec<f32>, model: impl Into<String>) -> Self {
490 let dimension = vector.len();
491 Self {
492 name: name.into(),
493 vector,
494 model: model.into(),
495 dimension,
496 generated_at: current_unix_secs(),
497 }
498 }
499}
500
501#[derive(Debug, Clone)]
503pub struct UnifiedEntity {
504 pub id: EntityId,
506 logical_id: Option<EntityId>,
510 pub kind: EntityKind,
512 pub created_at: u64,
514 pub updated_at: u64,
516 pub data: EntityData,
518 pub sequence_id: u64,
520 pub field_bloom: u64,
531 pub xmin: u64,
540 pub xmax: u64,
546 aux: Option<Box<EntityAux>>,
549}
550
551#[derive(Debug, Clone, Default)]
553pub struct EntityAux {
554 pub embeddings: Vec<EmbeddingSlot>,
556 pub cross_refs: Vec<CrossRef>,
558}
559
560impl UnifiedEntity {
561 pub fn embeddings(&self) -> &[EmbeddingSlot] {
563 self.aux
564 .as_ref()
565 .map(|a| a.embeddings.as_slice())
566 .unwrap_or(&[])
567 }
568
569 pub fn cross_refs(&self) -> &[CrossRef] {
571 self.aux
572 .as_ref()
573 .map(|a| a.cross_refs.as_slice())
574 .unwrap_or(&[])
575 }
576
577 pub fn embeddings_mut(&mut self) -> &mut Vec<EmbeddingSlot> {
579 &mut self.aux.get_or_insert_with(Default::default).embeddings
580 }
581
582 pub fn cross_refs_mut(&mut self) -> &mut Vec<CrossRef> {
584 &mut self.aux.get_or_insert_with(Default::default).cross_refs
585 }
586
587 pub fn has_aux(&self) -> bool {
589 self.aux.is_some()
590 }
591}
592
593#[inline]
599pub fn field_name_bloom(name: &str) -> u64 {
600 let b = name.as_bytes();
601 if b.is_empty() {
602 return 0;
603 }
604 1u64 << (b[b.len() / 2] & 63)
605}
606
607pub fn compute_entity_field_bloom(data: &EntityData) -> u64 {
611 match data {
612 EntityData::Row(row) => {
613 if row.schema.is_some() {
614 return 0;
617 }
618 if let Some(named) = &row.named {
619 named.keys().fold(0u64, |acc, k| acc | field_name_bloom(k))
620 } else {
621 0
622 }
623 }
624 EntityData::Node(node) => node
625 .properties
626 .keys()
627 .fold(0u64, |acc, k| acc | field_name_bloom(k)),
628 EntityData::Edge(edge) => edge
629 .properties
630 .keys()
631 .fold(0u64, |acc, k| acc | field_name_bloom(k)),
632 _ => 0,
634 }
635}
636
637impl UnifiedEntity {
638 pub fn new(id: EntityId, kind: EntityKind, data: EntityData) -> Self {
640 let now = current_unix_secs();
641 let field_bloom = compute_entity_field_bloom(&data);
642
643 Self {
644 id,
645 logical_id: None,
646 kind,
647 created_at: now,
648 updated_at: now,
649 data,
650 sequence_id: 0,
651 field_bloom,
652 xmin: 0,
655 xmax: 0,
656 aux: None,
657 }
658 }
659
660 #[inline]
671 pub fn is_visible(&self, snapshot_xid: u64) -> bool {
672 if self.xmin != 0 && self.xmin > snapshot_xid {
673 return false;
674 }
675 if self.xmax != 0 && self.xmax <= snapshot_xid {
676 return false;
677 }
678 true
679 }
680
681 #[inline]
684 pub fn set_xmin(&mut self, xid: u64) {
685 self.xmin = xid;
686 }
687
688 #[inline]
692 pub fn set_xmax(&mut self, xid: u64) {
693 self.xmax = xid;
694 }
695
696 #[inline]
699 pub fn logical_id(&self) -> EntityId {
700 self.logical_id.unwrap_or(self.id)
701 }
702
703 #[inline]
705 pub fn has_explicit_logical_id(&self) -> bool {
706 self.logical_id.is_some()
707 }
708
709 #[inline]
711 pub fn set_logical_id(&mut self, logical_id: EntityId) {
712 self.logical_id = Some(logical_id);
713 }
714
715 #[inline]
719 pub(crate) fn ensure_table_logical_id(&mut self) {
720 if matches!(self.kind, EntityKind::TableRow { .. }) && self.logical_id.is_none() {
721 self.logical_id = Some(self.id);
722 }
723 }
724
725 pub fn table_row(
727 id: EntityId,
728 table: impl Into<Arc<str>>,
729 row_id: u64,
730 columns: Vec<Value>,
731 ) -> Self {
732 Self::new(
733 id,
734 EntityKind::TableRow {
735 table: table.into(),
736 row_id,
737 },
738 EntityData::Row(RowData::new(columns)),
739 )
740 }
741
742 pub fn graph_node(
744 id: EntityId,
745 label: impl Into<String>,
746 node_type: impl Into<String>,
747 properties: HashMap<String, Value>,
748 ) -> Self {
749 Self::new(
750 id,
751 EntityKind::GraphNode(Box::new(GraphNodeKind {
752 label: label.into(),
753 node_type: node_type.into(),
754 })),
755 EntityData::Node(NodeData::with_properties(properties)),
756 )
757 }
758
759 pub fn graph_edge(
761 id: EntityId,
762 label: impl Into<String>,
763 from: impl Into<String>,
764 to: impl Into<String>,
765 weight: f32,
766 properties: HashMap<String, Value>,
767 ) -> Self {
768 Self::new(
769 id,
770 EntityKind::GraphEdge(Box::new(GraphEdgeKind {
771 label: label.into(),
772 from_node: from.into(),
773 to_node: to.into(),
774 weight: (weight * 1000.0) as u32,
775 })),
776 EntityData::Edge(EdgeData::with_properties(weight, properties)),
777 )
778 }
779
780 pub fn vector(id: EntityId, collection: impl Into<String>, vector: Vec<f32>) -> Self {
782 Self::new(
783 id,
784 EntityKind::Vector {
785 collection: collection.into(),
786 },
787 EntityData::Vector(VectorData::new(vector)),
788 )
789 }
790
791 pub fn add_embedding(&mut self, slot: EmbeddingSlot) {
793 self.embeddings_mut().push(slot);
794 self.touch();
795 }
796
797 pub fn add_cross_ref(&mut self, cross_ref: CrossRef) {
799 self.cross_refs_mut().push(cross_ref);
800 self.touch();
801 }
802
803 pub fn get_embedding(&self, name: &str) -> Option<&EmbeddingSlot> {
805 self.embeddings().iter().find(|e| e.name == name)
806 }
807
808 fn touch(&mut self) {
810 self.updated_at = current_unix_secs();
811 }
812
813 pub fn is_stale(&self, max_age_secs: u64) -> bool {
815 let now = current_unix_secs();
816 now.saturating_sub(self.updated_at) > max_age_secs
817 }
818}
819
820#[derive(Debug, Clone, PartialEq)]
822pub struct CrossRef {
823 pub source: EntityId,
825 pub target: EntityId,
827 pub target_collection: String,
829 pub ref_type: RefType,
831 pub weight: f32,
833 pub created_at: u64,
835}
836
837impl CrossRef {
838 pub fn new(
840 source: EntityId,
841 target: EntityId,
842 target_collection: impl Into<String>,
843 ref_type: RefType,
844 ) -> Self {
845 Self {
846 source,
847 target,
848 target_collection: target_collection.into(),
849 ref_type,
850 weight: 1.0,
851 created_at: current_unix_secs(),
852 }
853 }
854
855 pub fn with_weight(
857 source: EntityId,
858 target: EntityId,
859 target_collection: impl Into<String>,
860 ref_type: RefType,
861 weight: f32,
862 ) -> Self {
863 let mut cr = Self::new(source, target, target_collection, ref_type);
864 cr.weight = weight;
865 cr
866 }
867}
868
869#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
871pub enum RefType {
872 RowToNode, RowToEdge, NodeToRow, RowToVector, VectorToRow, NodeToVector, EdgeToVector, VectorToNode, SimilarTo, RelatedTo, DerivesFrom, Mentions, Contains, DependsOn, }
894
895impl RefType {
896 pub fn inverse(&self) -> Option<Self> {
898 match self {
899 Self::RowToNode => Some(Self::NodeToRow),
900 Self::NodeToRow => Some(Self::RowToNode),
901 Self::RowToVector => Some(Self::VectorToRow),
902 Self::VectorToRow => Some(Self::RowToVector),
903 Self::NodeToVector => Some(Self::VectorToNode),
904 Self::VectorToNode => Some(Self::NodeToVector),
905 Self::SimilarTo => Some(Self::SimilarTo), Self::RelatedTo => Some(Self::RelatedTo), _ => None, }
909 }
910
911 pub fn is_symmetric(&self) -> bool {
913 matches!(self, Self::SimilarTo | Self::RelatedTo)
914 }
915
916 pub fn to_byte(&self) -> u8 {
918 match self {
919 Self::RowToNode => 0,
920 Self::RowToEdge => 1,
921 Self::NodeToRow => 2,
922 Self::RowToVector => 3,
923 Self::VectorToRow => 4,
924 Self::NodeToVector => 5,
925 Self::EdgeToVector => 6,
926 Self::VectorToNode => 7,
927 Self::SimilarTo => 8,
928 Self::RelatedTo => 9,
929 Self::DerivesFrom => 10,
930 Self::Mentions => 11,
931 Self::Contains => 12,
932 Self::DependsOn => 13,
933 }
934 }
935
936 pub fn from_byte(byte: u8) -> Self {
938 match byte {
939 0 => Self::RowToNode,
940 1 => Self::RowToEdge,
941 2 => Self::NodeToRow,
942 3 => Self::RowToVector,
943 4 => Self::VectorToRow,
944 5 => Self::NodeToVector,
945 6 => Self::EdgeToVector,
946 7 => Self::VectorToNode,
947 8 => Self::SimilarTo,
948 9 => Self::RelatedTo,
949 10 => Self::DerivesFrom,
950 11 => Self::Mentions,
951 12 => Self::Contains,
952 13 => Self::DependsOn,
953 _ => Self::RelatedTo, }
955 }
956}
957
958impl From<Vec<Value>> for RowData {
960 fn from(columns: Vec<Value>) -> Self {
961 RowData::new(columns)
962 }
963}
964
965impl From<HashMap<String, Value>> for NodeData {
967 fn from(properties: HashMap<String, Value>) -> Self {
968 NodeData::with_properties(properties)
969 }
970}
971
972impl From<Vec<f32>> for VectorData {
974 fn from(dense: Vec<f32>) -> Self {
975 VectorData::new(dense)
976 }
977}
978
979impl From<(Vec<f32>, SparseVector)> for VectorData {
981 fn from((dense, sparse): (Vec<f32>, SparseVector)) -> Self {
982 VectorData::with_sparse(dense, sparse)
983 }
984}
985
986impl UnifiedEntity {
988 pub fn from_properties(
990 id: EntityId,
991 label: impl Into<String>,
992 node_type: impl Into<String>,
993 properties: impl IntoIterator<Item = (impl Into<String>, Value)>,
994 ) -> Self {
995 let props: HashMap<String, Value> =
996 properties.into_iter().map(|(k, v)| (k.into(), v)).collect();
997 Self::graph_node(id, label, node_type, props)
998 }
999
1000 pub fn into_row(self) -> Option<RowData> {
1002 match self.data {
1003 EntityData::Row(r) => Some(r),
1004 _ => None,
1005 }
1006 }
1007
1008 pub fn into_node(self) -> Option<NodeData> {
1010 match self.data {
1011 EntityData::Node(n) => Some(n),
1012 _ => None,
1013 }
1014 }
1015
1016 pub fn into_edge(self) -> Option<EdgeData> {
1018 match self.data {
1019 EntityData::Edge(e) => Some(e),
1020 _ => None,
1021 }
1022 }
1023
1024 pub fn into_vector(self) -> Option<VectorData> {
1026 match self.data {
1027 EntityData::Vector(v) => Some(v),
1028 _ => None,
1029 }
1030 }
1031}
1032
1033#[cfg(test)]
1034mod tests {
1035 use super::*;
1036
1037 #[test]
1038 fn test_entity_creation() {
1039 let id = EntityId::new(1);
1040 let entity = UnifiedEntity::table_row(
1041 id,
1042 "users",
1043 100,
1044 vec![Value::text("alice".to_string()), Value::Integer(25)],
1045 );
1046
1047 assert!(entity.data.is_row());
1048 assert_eq!(entity.kind.storage_type(), "table");
1049 assert_eq!(entity.kind.collection(), "users");
1050 }
1051
1052 #[test]
1053 fn test_cross_refs() {
1054 let id1 = EntityId::new(1);
1055 let id2 = EntityId::new(2);
1056
1057 let cross_ref = CrossRef::new(id1, id2, "nodes", RefType::RowToNode);
1058 assert_eq!(cross_ref.source, id1);
1059 assert_eq!(cross_ref.target, id2);
1060 assert_eq!(cross_ref.ref_type.inverse(), Some(RefType::NodeToRow));
1061 }
1062
1063 #[test]
1064 fn test_sparse_vector() {
1065 let sparse = SparseVector::new(vec![0, 5, 10], vec![1.0, 2.0, 3.0], 100);
1066
1067 assert_eq!(sparse.nnz(), 3);
1068 assert_eq!(sparse.get(5), 2.0);
1069 assert_eq!(sparse.get(3), 0.0);
1070 assert!(sparse.sparsity() > 0.9);
1071 }
1072
1073 #[test]
1074 fn test_embedding_slots() {
1075 let mut entity = UnifiedEntity::table_row(
1076 EntityId::new(1),
1077 "documents",
1078 1,
1079 vec![Value::text("Hello world".to_string())],
1080 );
1081
1082 entity.add_embedding(EmbeddingSlot::new(
1083 "content",
1084 vec![0.1, 0.2, 0.3],
1085 "text-embedding-3-small",
1086 ));
1087
1088 assert_eq!(entity.embeddings().len(), 1);
1089 assert!(entity.get_embedding("content").is_some());
1090 assert!(entity.get_embedding("summary").is_none());
1091 }
1092}