1use std::collections::HashMap;
7use std::fmt;
8use std::sync::Arc;
9
10use crate::storage::schema::Value;
11
12pub const FIRST_USER_ENTITY_ID: u64 = 1024;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
27pub struct EntityId(pub u64);
28
29impl EntityId {
30 pub fn new(id: u64) -> Self {
32 Self(id)
33 }
34
35 pub fn raw(&self) -> u64 {
37 self.0
38 }
39}
40
41impl fmt::Display for EntityId {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 write!(f, "e{}", self.0)
44 }
45}
46
47impl From<u64> for EntityId {
48 fn from(id: u64) -> Self {
49 Self(id)
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash)]
55pub enum EntityKind {
56 TableRow { table: Arc<str>, row_id: u64 },
58 GraphNode(Box<GraphNodeKind>),
60 GraphEdge(Box<GraphEdgeKind>),
62 Vector { collection: String },
64 TimeSeriesPoint(Box<TimeSeriesPointKind>),
66 QueueMessage { queue: String, position: u64 },
68}
69
70#[derive(Debug, Clone, PartialEq, Eq, Hash)]
71pub struct GraphNodeKind {
72 pub label: String,
73 pub node_type: String,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Hash)]
77pub struct GraphEdgeKind {
78 pub label: String,
79 pub from_node: String,
80 pub to_node: String,
81 pub weight: u32,
82}
83
84#[derive(Debug, Clone, PartialEq, Eq, Hash)]
85pub struct TimeSeriesPointKind {
86 pub series: String,
87 pub metric: String,
88}
89
90impl EntityKind {
91 pub fn storage_type(&self) -> &'static str {
93 match self {
94 Self::TableRow { .. } => "table",
95 Self::GraphNode(_) => "graph_node",
96 Self::GraphEdge(_) => "graph_edge",
97 Self::Vector { .. } => "vector",
98 Self::TimeSeriesPoint(_) => "timeseries",
99 Self::QueueMessage { .. } => "queue",
100 }
101 }
102
103 pub fn collection(&self) -> &str {
105 match self {
106 Self::TableRow { table, .. } => table,
107 Self::GraphNode(n) => &n.label,
108 Self::GraphEdge(e) => &e.label,
109 Self::Vector { collection } => collection,
110 Self::TimeSeriesPoint(ts) => &ts.series,
111 Self::QueueMessage { queue, .. } => queue,
112 }
113 }
114}
115
116#[derive(Debug, Clone)]
118pub enum EntityData {
119 Row(RowData),
121 Node(NodeData),
123 Edge(EdgeData),
125 Vector(VectorData),
127 TimeSeries(TimeSeriesData),
129 QueueMessage(QueueMessageData),
131}
132
133impl EntityData {
134 pub fn is_row(&self) -> bool {
136 matches!(self, Self::Row(_))
137 }
138
139 pub fn is_node(&self) -> bool {
141 matches!(self, Self::Node(_))
142 }
143
144 pub fn is_edge(&self) -> bool {
146 matches!(self, Self::Edge(_))
147 }
148
149 pub fn is_vector(&self) -> bool {
151 matches!(self, Self::Vector(_))
152 }
153
154 pub fn as_row(&self) -> Option<&RowData> {
156 match self {
157 Self::Row(r) => Some(r),
158 _ => None,
159 }
160 }
161
162 pub fn as_node(&self) -> Option<&NodeData> {
164 match self {
165 Self::Node(n) => Some(n),
166 _ => None,
167 }
168 }
169
170 pub fn as_edge(&self) -> Option<&EdgeData> {
172 match self {
173 Self::Edge(e) => Some(e),
174 _ => None,
175 }
176 }
177
178 pub fn as_vector(&self) -> Option<&VectorData> {
180 match self {
181 Self::Vector(v) => Some(v),
182 _ => None,
183 }
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct RowData {
190 pub columns: Vec<Value>,
192 pub named: Option<HashMap<String, Value>>,
194 pub schema: Option<std::sync::Arc<Vec<String>>>,
198}
199
200impl RowData {
201 pub fn new(columns: Vec<Value>) -> Self {
203 Self {
204 columns,
205 named: None,
206 schema: None,
207 }
208 }
209
210 pub fn with_names(columns: Vec<Value>, names: Vec<String>) -> Self {
212 let named: HashMap<String, Value> =
213 names.into_iter().zip(columns.iter().cloned()).collect();
214 Self {
215 columns,
216 named: Some(named),
217 schema: None,
218 }
219 }
220
221 pub fn get_field(&self, name: &str) -> Option<&Value> {
223 if let Some(ref named) = self.named {
225 return named.get(name);
226 }
227 if let Some(ref schema) = self.schema {
229 if let Some(idx) = schema.iter().position(|s| s == name) {
230 return self.columns.get(idx);
231 }
232 }
233 None
234 }
235
236 pub fn iter_fields(&self) -> Box<dyn Iterator<Item = (&str, &Value)> + '_> {
238 if let Some(ref named) = self.named {
239 Box::new(named.iter().map(|(k, v)| (k.as_str(), v)))
240 } else if let Some(ref schema) = self.schema {
241 Box::new(
242 schema
243 .iter()
244 .zip(self.columns.iter())
245 .map(|(k, v)| (k.as_str(), v)),
246 )
247 } else {
248 Box::new(std::iter::empty())
249 }
250 }
251
252 pub fn get(&self, index: usize) -> Option<&Value> {
254 self.columns.get(index)
255 }
256
257 pub fn get_by_name(&self, name: &str) -> Option<&Value> {
259 self.named.as_ref()?.get(name)
260 }
261
262 pub fn len(&self) -> usize {
264 self.columns.len()
265 }
266
267 pub fn is_empty(&self) -> bool {
269 self.columns.is_empty()
270 }
271}
272
273#[derive(Debug, Clone)]
275pub struct NodeData {
276 pub properties: HashMap<String, Value>,
278}
279
280impl NodeData {
281 pub fn new() -> Self {
283 Self {
284 properties: HashMap::new(),
285 }
286 }
287
288 pub fn with_properties(properties: HashMap<String, Value>) -> Self {
290 Self { properties }
291 }
292
293 pub fn set(&mut self, key: impl Into<String>, value: Value) {
295 self.properties.insert(key.into(), value);
296 }
297
298 pub fn get(&self, key: &str) -> Option<&Value> {
300 self.properties.get(key)
301 }
302
303 pub fn has(&self, key: &str) -> bool {
305 self.properties.contains_key(key)
306 }
307}
308
309impl Default for NodeData {
310 fn default() -> Self {
311 Self::new()
312 }
313}
314
315#[derive(Debug, Clone)]
317pub struct EdgeData {
318 pub properties: HashMap<String, Value>,
320 pub weight: f32,
322}
323
324impl EdgeData {
325 pub fn new(weight: f32) -> Self {
327 Self {
328 properties: HashMap::new(),
329 weight,
330 }
331 }
332
333 pub fn with_properties(weight: f32, properties: HashMap<String, Value>) -> Self {
335 Self { properties, weight }
336 }
337
338 pub fn set(&mut self, key: impl Into<String>, value: Value) {
340 self.properties.insert(key.into(), value);
341 }
342
343 pub fn get(&self, key: &str) -> Option<&Value> {
345 self.properties.get(key)
346 }
347}
348
349impl Default for EdgeData {
350 fn default() -> Self {
351 Self::new(1.0)
352 }
353}
354
355#[derive(Debug, Clone)]
357pub struct VectorData {
358 pub dense: Vec<f32>,
360 pub sparse: Option<SparseVector>,
362 pub content: Option<String>,
364}
365
366impl VectorData {
367 pub fn new(dense: Vec<f32>) -> Self {
369 Self {
370 dense,
371 sparse: None,
372 content: None,
373 }
374 }
375
376 pub fn with_sparse(dense: Vec<f32>, sparse: SparseVector) -> Self {
378 Self {
379 dense,
380 sparse: Some(sparse),
381 content: None,
382 }
383 }
384
385 pub fn with_content(mut self, content: impl Into<String>) -> Self {
387 self.content = Some(content.into());
388 self
389 }
390
391 pub fn dimension(&self) -> usize {
393 self.dense.len()
394 }
395
396 pub fn is_hybrid(&self) -> bool {
398 self.sparse.is_some()
399 }
400}
401
402#[derive(Debug, Clone)]
404pub struct TimeSeriesData {
405 pub metric: String,
407 pub timestamp_ns: u64,
409 pub value: f64,
411 pub tags: std::collections::HashMap<String, String>,
413}
414
415#[derive(Debug, Clone)]
417pub struct QueueMessageData {
418 pub payload: Value,
420 pub priority: Option<i32>,
422 pub enqueued_at_ns: u64,
424 pub attempts: u32,
426 pub max_attempts: u32,
428 pub acked: bool,
430}
431
432#[derive(Debug, Clone)]
434pub struct SparseVector {
435 pub indices: Vec<u32>,
437 pub values: Vec<f32>,
439 pub dimension: usize,
441}
442
443impl SparseVector {
444 pub fn new(indices: Vec<u32>, values: Vec<f32>, dimension: usize) -> Self {
446 debug_assert_eq!(indices.len(), values.len());
447 Self {
448 indices,
449 values,
450 dimension,
451 }
452 }
453
454 pub fn nnz(&self) -> usize {
456 self.indices.len()
457 }
458
459 pub fn sparsity(&self) -> f32 {
461 if self.dimension == 0 {
462 1.0
463 } else {
464 1.0 - (self.nnz() as f32 / self.dimension as f32)
465 }
466 }
467
468 pub fn get(&self, index: u32) -> f32 {
470 self.indices
471 .iter()
472 .position(|&i| i == index)
473 .map(|pos| self.values[pos])
474 .unwrap_or(0.0)
475 }
476}
477
478#[derive(Debug, Clone)]
480pub struct EmbeddingSlot {
481 pub name: String,
483 pub vector: Vec<f32>,
485 pub model: String,
487 pub dimension: usize,
489 pub generated_at: u64,
491}
492
493fn current_unix_secs() -> u64 {
494 std::time::SystemTime::now()
495 .duration_since(std::time::UNIX_EPOCH)
496 .unwrap_or_default()
497 .as_secs()
498}
499
500impl EmbeddingSlot {
501 pub fn new(name: impl Into<String>, vector: Vec<f32>, model: impl Into<String>) -> Self {
503 let dimension = vector.len();
504 Self {
505 name: name.into(),
506 vector,
507 model: model.into(),
508 dimension,
509 generated_at: current_unix_secs(),
510 }
511 }
512}
513
514#[derive(Debug, Clone)]
516pub struct UnifiedEntity {
517 pub id: EntityId,
519 logical_id: Option<EntityId>,
523 pub kind: EntityKind,
525 pub created_at: u64,
527 pub updated_at: u64,
529 pub data: EntityData,
531 pub sequence_id: u64,
533 pub field_bloom: u64,
544 pub xmin: u64,
553 pub xmax: u64,
559 aux: Option<Box<EntityAux>>,
562}
563
564#[derive(Debug, Clone, Default)]
566pub struct EntityAux {
567 pub embeddings: Vec<EmbeddingSlot>,
569 pub cross_refs: Vec<CrossRef>,
571}
572
573impl UnifiedEntity {
574 pub fn embeddings(&self) -> &[EmbeddingSlot] {
576 self.aux
577 .as_ref()
578 .map(|a| a.embeddings.as_slice())
579 .unwrap_or(&[])
580 }
581
582 pub fn cross_refs(&self) -> &[CrossRef] {
584 self.aux
585 .as_ref()
586 .map(|a| a.cross_refs.as_slice())
587 .unwrap_or(&[])
588 }
589
590 pub fn embeddings_mut(&mut self) -> &mut Vec<EmbeddingSlot> {
592 &mut self.aux.get_or_insert_with(Default::default).embeddings
593 }
594
595 pub fn cross_refs_mut(&mut self) -> &mut Vec<CrossRef> {
597 &mut self.aux.get_or_insert_with(Default::default).cross_refs
598 }
599
600 pub fn has_aux(&self) -> bool {
602 self.aux.is_some()
603 }
604}
605
606#[inline]
612pub fn field_name_bloom(name: &str) -> u64 {
613 let b = name.as_bytes();
614 if b.is_empty() {
615 return 0;
616 }
617 1u64 << (b[b.len() / 2] & 63)
618}
619
620pub fn compute_entity_field_bloom(data: &EntityData) -> u64 {
624 match data {
625 EntityData::Row(row) => {
626 if row.schema.is_some() {
627 return 0;
630 }
631 if let Some(named) = &row.named {
632 named.keys().fold(0u64, |acc, k| acc | field_name_bloom(k))
633 } else {
634 0
635 }
636 }
637 EntityData::Node(node) => node
638 .properties
639 .keys()
640 .fold(0u64, |acc, k| acc | field_name_bloom(k)),
641 EntityData::Edge(edge) => edge
642 .properties
643 .keys()
644 .fold(0u64, |acc, k| acc | field_name_bloom(k)),
645 _ => 0,
647 }
648}
649
650impl UnifiedEntity {
651 pub fn new(id: EntityId, kind: EntityKind, data: EntityData) -> Self {
653 let now = current_unix_secs();
654 let field_bloom = compute_entity_field_bloom(&data);
655
656 Self {
657 id,
658 logical_id: None,
659 kind,
660 created_at: now,
661 updated_at: now,
662 data,
663 sequence_id: 0,
664 field_bloom,
665 xmin: 0,
668 xmax: 0,
669 aux: None,
670 }
671 }
672
673 #[inline]
684 pub fn is_visible(&self, snapshot_xid: u64) -> bool {
685 if self.xmin != 0 && self.xmin > snapshot_xid {
686 return false;
687 }
688 if self.xmax != 0 && self.xmax <= snapshot_xid {
689 return false;
690 }
691 true
692 }
693
694 #[inline]
697 pub fn set_xmin(&mut self, xid: u64) {
698 self.xmin = xid;
699 }
700
701 #[inline]
705 pub fn set_xmax(&mut self, xid: u64) {
706 self.xmax = xid;
707 }
708
709 #[inline]
712 pub fn logical_id(&self) -> EntityId {
713 self.logical_id.unwrap_or(self.id)
714 }
715
716 #[inline]
718 pub fn has_explicit_logical_id(&self) -> bool {
719 self.logical_id.is_some()
720 }
721
722 #[inline]
724 pub fn set_logical_id(&mut self, logical_id: EntityId) {
725 self.logical_id = Some(logical_id);
726 }
727
728 #[inline]
732 pub(crate) fn ensure_table_logical_id(&mut self) {
733 if matches!(self.kind, EntityKind::TableRow { .. }) && self.logical_id.is_none() {
734 self.logical_id = Some(self.id);
735 }
736 }
737
738 pub fn table_row(
740 id: EntityId,
741 table: impl Into<Arc<str>>,
742 row_id: u64,
743 columns: Vec<Value>,
744 ) -> Self {
745 Self::new(
746 id,
747 EntityKind::TableRow {
748 table: table.into(),
749 row_id,
750 },
751 EntityData::Row(RowData::new(columns)),
752 )
753 }
754
755 pub fn graph_node(
757 id: EntityId,
758 label: impl Into<String>,
759 node_type: impl Into<String>,
760 properties: HashMap<String, Value>,
761 ) -> Self {
762 Self::new(
763 id,
764 EntityKind::GraphNode(Box::new(GraphNodeKind {
765 label: label.into(),
766 node_type: node_type.into(),
767 })),
768 EntityData::Node(NodeData::with_properties(properties)),
769 )
770 }
771
772 pub fn graph_edge(
774 id: EntityId,
775 label: impl Into<String>,
776 from: impl Into<String>,
777 to: impl Into<String>,
778 weight: f32,
779 properties: HashMap<String, Value>,
780 ) -> Self {
781 Self::new(
782 id,
783 EntityKind::GraphEdge(Box::new(GraphEdgeKind {
784 label: label.into(),
785 from_node: from.into(),
786 to_node: to.into(),
787 weight: (weight * 1000.0) as u32,
788 })),
789 EntityData::Edge(EdgeData::with_properties(weight, properties)),
790 )
791 }
792
793 pub fn vector(id: EntityId, collection: impl Into<String>, vector: Vec<f32>) -> Self {
795 Self::new(
796 id,
797 EntityKind::Vector {
798 collection: collection.into(),
799 },
800 EntityData::Vector(VectorData::new(vector)),
801 )
802 }
803
804 pub fn add_embedding(&mut self, slot: EmbeddingSlot) {
806 self.embeddings_mut().push(slot);
807 self.touch();
808 }
809
810 pub fn add_cross_ref(&mut self, cross_ref: CrossRef) {
812 self.cross_refs_mut().push(cross_ref);
813 self.touch();
814 }
815
816 pub fn get_embedding(&self, name: &str) -> Option<&EmbeddingSlot> {
818 self.embeddings().iter().find(|e| e.name == name)
819 }
820
821 fn touch(&mut self) {
823 self.updated_at = current_unix_secs();
824 }
825
826 pub fn is_stale(&self, max_age_secs: u64) -> bool {
828 let now = current_unix_secs();
829 now.saturating_sub(self.updated_at) > max_age_secs
830 }
831}
832
833#[derive(Debug, Clone, PartialEq)]
835pub struct CrossRef {
836 pub source: EntityId,
838 pub target: EntityId,
840 pub target_collection: String,
842 pub ref_type: RefType,
844 pub weight: f32,
846 pub created_at: u64,
848}
849
850impl CrossRef {
851 pub fn new(
853 source: EntityId,
854 target: EntityId,
855 target_collection: impl Into<String>,
856 ref_type: RefType,
857 ) -> Self {
858 Self {
859 source,
860 target,
861 target_collection: target_collection.into(),
862 ref_type,
863 weight: 1.0,
864 created_at: current_unix_secs(),
865 }
866 }
867
868 pub fn with_weight(
870 source: EntityId,
871 target: EntityId,
872 target_collection: impl Into<String>,
873 ref_type: RefType,
874 weight: f32,
875 ) -> Self {
876 let mut cr = Self::new(source, target, target_collection, ref_type);
877 cr.weight = weight;
878 cr
879 }
880}
881
882#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
884pub enum RefType {
885 RowToNode, RowToEdge, NodeToRow, RowToVector, VectorToRow, NodeToVector, EdgeToVector, VectorToNode, SimilarTo, RelatedTo, DerivesFrom, Mentions, Contains, DependsOn, }
907
908impl RefType {
909 pub fn inverse(&self) -> Option<Self> {
911 match self {
912 Self::RowToNode => Some(Self::NodeToRow),
913 Self::NodeToRow => Some(Self::RowToNode),
914 Self::RowToVector => Some(Self::VectorToRow),
915 Self::VectorToRow => Some(Self::RowToVector),
916 Self::NodeToVector => Some(Self::VectorToNode),
917 Self::VectorToNode => Some(Self::NodeToVector),
918 Self::SimilarTo => Some(Self::SimilarTo), Self::RelatedTo => Some(Self::RelatedTo), _ => None, }
922 }
923
924 pub fn is_symmetric(&self) -> bool {
926 matches!(self, Self::SimilarTo | Self::RelatedTo)
927 }
928
929 pub fn to_byte(&self) -> u8 {
931 match self {
932 Self::RowToNode => 0,
933 Self::RowToEdge => 1,
934 Self::NodeToRow => 2,
935 Self::RowToVector => 3,
936 Self::VectorToRow => 4,
937 Self::NodeToVector => 5,
938 Self::EdgeToVector => 6,
939 Self::VectorToNode => 7,
940 Self::SimilarTo => 8,
941 Self::RelatedTo => 9,
942 Self::DerivesFrom => 10,
943 Self::Mentions => 11,
944 Self::Contains => 12,
945 Self::DependsOn => 13,
946 }
947 }
948
949 pub fn from_byte(byte: u8) -> Self {
951 match byte {
952 0 => Self::RowToNode,
953 1 => Self::RowToEdge,
954 2 => Self::NodeToRow,
955 3 => Self::RowToVector,
956 4 => Self::VectorToRow,
957 5 => Self::NodeToVector,
958 6 => Self::EdgeToVector,
959 7 => Self::VectorToNode,
960 8 => Self::SimilarTo,
961 9 => Self::RelatedTo,
962 10 => Self::DerivesFrom,
963 11 => Self::Mentions,
964 12 => Self::Contains,
965 13 => Self::DependsOn,
966 _ => Self::RelatedTo, }
968 }
969}
970
971impl From<Vec<Value>> for RowData {
973 fn from(columns: Vec<Value>) -> Self {
974 RowData::new(columns)
975 }
976}
977
978impl From<HashMap<String, Value>> for NodeData {
980 fn from(properties: HashMap<String, Value>) -> Self {
981 NodeData::with_properties(properties)
982 }
983}
984
985impl From<Vec<f32>> for VectorData {
987 fn from(dense: Vec<f32>) -> Self {
988 VectorData::new(dense)
989 }
990}
991
992impl From<(Vec<f32>, SparseVector)> for VectorData {
994 fn from((dense, sparse): (Vec<f32>, SparseVector)) -> Self {
995 VectorData::with_sparse(dense, sparse)
996 }
997}
998
999impl UnifiedEntity {
1001 pub fn from_properties(
1003 id: EntityId,
1004 label: impl Into<String>,
1005 node_type: impl Into<String>,
1006 properties: impl IntoIterator<Item = (impl Into<String>, Value)>,
1007 ) -> Self {
1008 let props: HashMap<String, Value> =
1009 properties.into_iter().map(|(k, v)| (k.into(), v)).collect();
1010 Self::graph_node(id, label, node_type, props)
1011 }
1012
1013 pub fn into_row(self) -> Option<RowData> {
1015 match self.data {
1016 EntityData::Row(r) => Some(r),
1017 _ => None,
1018 }
1019 }
1020
1021 pub fn into_node(self) -> Option<NodeData> {
1023 match self.data {
1024 EntityData::Node(n) => Some(n),
1025 _ => None,
1026 }
1027 }
1028
1029 pub fn into_edge(self) -> Option<EdgeData> {
1031 match self.data {
1032 EntityData::Edge(e) => Some(e),
1033 _ => None,
1034 }
1035 }
1036
1037 pub fn into_vector(self) -> Option<VectorData> {
1039 match self.data {
1040 EntityData::Vector(v) => Some(v),
1041 _ => None,
1042 }
1043 }
1044}
1045
1046#[cfg(test)]
1047mod tests {
1048 use super::*;
1049
1050 #[test]
1051 fn test_entity_creation() {
1052 let id = EntityId::new(1);
1053 let entity = UnifiedEntity::table_row(
1054 id,
1055 "users",
1056 100,
1057 vec![Value::text("alice".to_string()), Value::Integer(25)],
1058 );
1059
1060 assert!(entity.data.is_row());
1061 assert_eq!(entity.kind.storage_type(), "table");
1062 assert_eq!(entity.kind.collection(), "users");
1063 }
1064
1065 #[test]
1066 fn test_cross_refs() {
1067 let id1 = EntityId::new(1);
1068 let id2 = EntityId::new(2);
1069
1070 let cross_ref = CrossRef::new(id1, id2, "nodes", RefType::RowToNode);
1071 assert_eq!(cross_ref.source, id1);
1072 assert_eq!(cross_ref.target, id2);
1073 assert_eq!(cross_ref.ref_type.inverse(), Some(RefType::NodeToRow));
1074 }
1075
1076 #[test]
1077 fn test_sparse_vector() {
1078 let sparse = SparseVector::new(vec![0, 5, 10], vec![1.0, 2.0, 3.0], 100);
1079
1080 assert_eq!(sparse.nnz(), 3);
1081 assert_eq!(sparse.get(5), 2.0);
1082 assert_eq!(sparse.get(3), 0.0);
1083 assert!(sparse.sparsity() > 0.9);
1084 }
1085
1086 #[test]
1087 fn test_embedding_slots() {
1088 let mut entity = UnifiedEntity::table_row(
1089 EntityId::new(1),
1090 "documents",
1091 1,
1092 vec![Value::text("Hello world".to_string())],
1093 );
1094
1095 entity.add_embedding(EmbeddingSlot::new(
1096 "content",
1097 vec![0.1, 0.2, 0.3],
1098 "text-embedding-3-small",
1099 ));
1100
1101 assert_eq!(entity.embeddings().len(), 1);
1102 assert!(entity.get_embedding("content").is_some());
1103 assert!(entity.get_embedding("summary").is_none());
1104 }
1105}