1use std::collections::HashMap;
7use std::fmt;
8use std::sync::Arc;
9
10use crate::storage::schema::Value;
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
14pub struct EntityId(pub u64);
15
16impl EntityId {
17 pub fn new(id: u64) -> Self {
19 Self(id)
20 }
21
22 pub fn raw(&self) -> u64 {
24 self.0
25 }
26}
27
28impl fmt::Display for EntityId {
29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30 write!(f, "e{}", self.0)
31 }
32}
33
34impl From<u64> for EntityId {
35 fn from(id: u64) -> Self {
36 Self(id)
37 }
38}
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
42pub enum EntityKind {
43 TableRow { table: Arc<str>, row_id: u64 },
45 GraphNode(Box<GraphNodeKind>),
47 GraphEdge(Box<GraphEdgeKind>),
49 Vector { collection: String },
51 TimeSeriesPoint(Box<TimeSeriesPointKind>),
53 QueueMessage { queue: String, position: u64 },
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Hash)]
58pub struct GraphNodeKind {
59 pub label: String,
60 pub node_type: String,
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Hash)]
64pub struct GraphEdgeKind {
65 pub label: String,
66 pub from_node: String,
67 pub to_node: String,
68 pub weight: u32,
69}
70
71#[derive(Debug, Clone, PartialEq, Eq, Hash)]
72pub struct TimeSeriesPointKind {
73 pub series: String,
74 pub metric: String,
75}
76
77impl EntityKind {
78 pub fn storage_type(&self) -> &'static str {
80 match self {
81 Self::TableRow { .. } => "table",
82 Self::GraphNode(_) => "graph_node",
83 Self::GraphEdge(_) => "graph_edge",
84 Self::Vector { .. } => "vector",
85 Self::TimeSeriesPoint(_) => "timeseries",
86 Self::QueueMessage { .. } => "queue",
87 }
88 }
89
90 pub fn collection(&self) -> &str {
92 match self {
93 Self::TableRow { table, .. } => table,
94 Self::GraphNode(n) => &n.label,
95 Self::GraphEdge(e) => &e.label,
96 Self::Vector { collection } => collection,
97 Self::TimeSeriesPoint(ts) => &ts.series,
98 Self::QueueMessage { queue, .. } => queue,
99 }
100 }
101}
102
103#[derive(Debug, Clone)]
105pub enum EntityData {
106 Row(RowData),
108 Node(NodeData),
110 Edge(EdgeData),
112 Vector(VectorData),
114 TimeSeries(TimeSeriesData),
116 QueueMessage(QueueMessageData),
118}
119
120impl EntityData {
121 pub fn is_row(&self) -> bool {
123 matches!(self, Self::Row(_))
124 }
125
126 pub fn is_node(&self) -> bool {
128 matches!(self, Self::Node(_))
129 }
130
131 pub fn is_edge(&self) -> bool {
133 matches!(self, Self::Edge(_))
134 }
135
136 pub fn is_vector(&self) -> bool {
138 matches!(self, Self::Vector(_))
139 }
140
141 pub fn as_row(&self) -> Option<&RowData> {
143 match self {
144 Self::Row(r) => Some(r),
145 _ => None,
146 }
147 }
148
149 pub fn as_node(&self) -> Option<&NodeData> {
151 match self {
152 Self::Node(n) => Some(n),
153 _ => None,
154 }
155 }
156
157 pub fn as_edge(&self) -> Option<&EdgeData> {
159 match self {
160 Self::Edge(e) => Some(e),
161 _ => None,
162 }
163 }
164
165 pub fn as_vector(&self) -> Option<&VectorData> {
167 match self {
168 Self::Vector(v) => Some(v),
169 _ => None,
170 }
171 }
172}
173
174#[derive(Debug, Clone)]
176pub struct RowData {
177 pub columns: Vec<Value>,
179 pub named: Option<HashMap<String, Value>>,
181 pub schema: Option<std::sync::Arc<Vec<String>>>,
185}
186
187impl RowData {
188 pub fn new(columns: Vec<Value>) -> Self {
190 Self {
191 columns,
192 named: None,
193 schema: None,
194 }
195 }
196
197 pub fn with_names(columns: Vec<Value>, names: Vec<String>) -> Self {
199 let named: HashMap<String, Value> =
200 names.into_iter().zip(columns.iter().cloned()).collect();
201 Self {
202 columns,
203 named: Some(named),
204 schema: None,
205 }
206 }
207
208 pub fn get_field(&self, name: &str) -> Option<&Value> {
210 if let Some(ref named) = self.named {
212 return named.get(name);
213 }
214 if let Some(ref schema) = self.schema {
216 if let Some(idx) = schema.iter().position(|s| s == name) {
217 return self.columns.get(idx);
218 }
219 }
220 None
221 }
222
223 pub fn iter_fields(&self) -> Box<dyn Iterator<Item = (&str, &Value)> + '_> {
225 if let Some(ref named) = self.named {
226 Box::new(named.iter().map(|(k, v)| (k.as_str(), v)))
227 } else if let Some(ref schema) = self.schema {
228 Box::new(
229 schema
230 .iter()
231 .zip(self.columns.iter())
232 .map(|(k, v)| (k.as_str(), v)),
233 )
234 } else {
235 Box::new(std::iter::empty())
236 }
237 }
238
239 pub fn get(&self, index: usize) -> Option<&Value> {
241 self.columns.get(index)
242 }
243
244 pub fn get_by_name(&self, name: &str) -> Option<&Value> {
246 self.named.as_ref()?.get(name)
247 }
248
249 pub fn len(&self) -> usize {
251 self.columns.len()
252 }
253
254 pub fn is_empty(&self) -> bool {
256 self.columns.is_empty()
257 }
258}
259
260#[derive(Debug, Clone)]
262pub struct NodeData {
263 pub properties: HashMap<String, Value>,
265}
266
267impl NodeData {
268 pub fn new() -> Self {
270 Self {
271 properties: HashMap::new(),
272 }
273 }
274
275 pub fn with_properties(properties: HashMap<String, Value>) -> Self {
277 Self { properties }
278 }
279
280 pub fn set(&mut self, key: impl Into<String>, value: Value) {
282 self.properties.insert(key.into(), value);
283 }
284
285 pub fn get(&self, key: &str) -> Option<&Value> {
287 self.properties.get(key)
288 }
289
290 pub fn has(&self, key: &str) -> bool {
292 self.properties.contains_key(key)
293 }
294}
295
296impl Default for NodeData {
297 fn default() -> Self {
298 Self::new()
299 }
300}
301
302#[derive(Debug, Clone)]
304pub struct EdgeData {
305 pub properties: HashMap<String, Value>,
307 pub weight: f32,
309}
310
311impl EdgeData {
312 pub fn new(weight: f32) -> Self {
314 Self {
315 properties: HashMap::new(),
316 weight,
317 }
318 }
319
320 pub fn with_properties(weight: f32, properties: HashMap<String, Value>) -> Self {
322 Self { properties, weight }
323 }
324
325 pub fn set(&mut self, key: impl Into<String>, value: Value) {
327 self.properties.insert(key.into(), value);
328 }
329
330 pub fn get(&self, key: &str) -> Option<&Value> {
332 self.properties.get(key)
333 }
334}
335
336impl Default for EdgeData {
337 fn default() -> Self {
338 Self::new(1.0)
339 }
340}
341
342#[derive(Debug, Clone)]
344pub struct VectorData {
345 pub dense: Vec<f32>,
347 pub sparse: Option<SparseVector>,
349 pub content: Option<String>,
351}
352
353impl VectorData {
354 pub fn new(dense: Vec<f32>) -> Self {
356 Self {
357 dense,
358 sparse: None,
359 content: None,
360 }
361 }
362
363 pub fn with_sparse(dense: Vec<f32>, sparse: SparseVector) -> Self {
365 Self {
366 dense,
367 sparse: Some(sparse),
368 content: None,
369 }
370 }
371
372 pub fn with_content(mut self, content: impl Into<String>) -> Self {
374 self.content = Some(content.into());
375 self
376 }
377
378 pub fn dimension(&self) -> usize {
380 self.dense.len()
381 }
382
383 pub fn is_hybrid(&self) -> bool {
385 self.sparse.is_some()
386 }
387}
388
389#[derive(Debug, Clone)]
391pub struct TimeSeriesData {
392 pub metric: String,
394 pub timestamp_ns: u64,
396 pub value: f64,
398 pub tags: std::collections::HashMap<String, String>,
400}
401
402#[derive(Debug, Clone)]
404pub struct QueueMessageData {
405 pub payload: Value,
407 pub priority: Option<i32>,
409 pub enqueued_at_ns: u64,
411 pub attempts: u32,
413 pub max_attempts: u32,
415 pub acked: bool,
417}
418
419#[derive(Debug, Clone)]
421pub struct SparseVector {
422 pub indices: Vec<u32>,
424 pub values: Vec<f32>,
426 pub dimension: usize,
428}
429
430impl SparseVector {
431 pub fn new(indices: Vec<u32>, values: Vec<f32>, dimension: usize) -> Self {
433 debug_assert_eq!(indices.len(), values.len());
434 Self {
435 indices,
436 values,
437 dimension,
438 }
439 }
440
441 pub fn nnz(&self) -> usize {
443 self.indices.len()
444 }
445
446 pub fn sparsity(&self) -> f32 {
448 if self.dimension == 0 {
449 1.0
450 } else {
451 1.0 - (self.nnz() as f32 / self.dimension as f32)
452 }
453 }
454
455 pub fn get(&self, index: u32) -> f32 {
457 self.indices
458 .iter()
459 .position(|&i| i == index)
460 .map(|pos| self.values[pos])
461 .unwrap_or(0.0)
462 }
463}
464
465#[derive(Debug, Clone)]
467pub struct EmbeddingSlot {
468 pub name: String,
470 pub vector: Vec<f32>,
472 pub model: String,
474 pub dimension: usize,
476 pub generated_at: u64,
478}
479
480fn current_unix_secs() -> u64 {
481 std::time::SystemTime::now()
482 .duration_since(std::time::UNIX_EPOCH)
483 .unwrap_or_default()
484 .as_secs()
485}
486
487impl EmbeddingSlot {
488 pub fn new(name: impl Into<String>, vector: Vec<f32>, model: impl Into<String>) -> Self {
490 let dimension = vector.len();
491 Self {
492 name: name.into(),
493 vector,
494 model: model.into(),
495 dimension,
496 generated_at: current_unix_secs(),
497 }
498 }
499}
500
501#[derive(Debug, Clone)]
503pub struct UnifiedEntity {
504 pub id: EntityId,
506 pub kind: EntityKind,
508 pub created_at: u64,
510 pub updated_at: u64,
512 pub data: EntityData,
514 pub sequence_id: u64,
516 pub field_bloom: u64,
527 pub xmin: u64,
536 pub xmax: u64,
542 aux: Option<Box<EntityAux>>,
545}
546
547#[derive(Debug, Clone, Default)]
549pub struct EntityAux {
550 pub embeddings: Vec<EmbeddingSlot>,
552 pub cross_refs: Vec<CrossRef>,
554}
555
556impl UnifiedEntity {
557 pub fn embeddings(&self) -> &[EmbeddingSlot] {
559 self.aux
560 .as_ref()
561 .map(|a| a.embeddings.as_slice())
562 .unwrap_or(&[])
563 }
564
565 pub fn cross_refs(&self) -> &[CrossRef] {
567 self.aux
568 .as_ref()
569 .map(|a| a.cross_refs.as_slice())
570 .unwrap_or(&[])
571 }
572
573 pub fn embeddings_mut(&mut self) -> &mut Vec<EmbeddingSlot> {
575 &mut self.aux.get_or_insert_with(Default::default).embeddings
576 }
577
578 pub fn cross_refs_mut(&mut self) -> &mut Vec<CrossRef> {
580 &mut self.aux.get_or_insert_with(Default::default).cross_refs
581 }
582
583 pub fn has_aux(&self) -> bool {
585 self.aux.is_some()
586 }
587}
588
589#[inline]
595pub fn field_name_bloom(name: &str) -> u64 {
596 let b = name.as_bytes();
597 if b.is_empty() {
598 return 0;
599 }
600 1u64 << (b[b.len() / 2] & 63)
601}
602
603pub fn compute_entity_field_bloom(data: &EntityData) -> u64 {
607 match data {
608 EntityData::Row(row) => {
609 if row.schema.is_some() {
610 return 0;
613 }
614 if let Some(named) = &row.named {
615 named.keys().fold(0u64, |acc, k| acc | field_name_bloom(k))
616 } else {
617 0
618 }
619 }
620 EntityData::Node(node) => node
621 .properties
622 .keys()
623 .fold(0u64, |acc, k| acc | field_name_bloom(k)),
624 EntityData::Edge(edge) => edge
625 .properties
626 .keys()
627 .fold(0u64, |acc, k| acc | field_name_bloom(k)),
628 _ => 0,
630 }
631}
632
633impl UnifiedEntity {
634 pub fn new(id: EntityId, kind: EntityKind, data: EntityData) -> Self {
636 let now = current_unix_secs();
637 let field_bloom = compute_entity_field_bloom(&data);
638
639 Self {
640 id,
641 kind,
642 created_at: now,
643 updated_at: now,
644 data,
645 sequence_id: 0,
646 field_bloom,
647 xmin: 0,
650 xmax: 0,
651 aux: None,
652 }
653 }
654
655 #[inline]
666 pub fn is_visible(&self, snapshot_xid: u64) -> bool {
667 if self.xmin != 0 && self.xmin > snapshot_xid {
668 return false;
669 }
670 if self.xmax != 0 && self.xmax <= snapshot_xid {
671 return false;
672 }
673 true
674 }
675
676 #[inline]
679 pub fn set_xmin(&mut self, xid: u64) {
680 self.xmin = xid;
681 }
682
683 #[inline]
687 pub fn set_xmax(&mut self, xid: u64) {
688 self.xmax = xid;
689 }
690
691 pub fn table_row(
693 id: EntityId,
694 table: impl Into<Arc<str>>,
695 row_id: u64,
696 columns: Vec<Value>,
697 ) -> Self {
698 Self::new(
699 id,
700 EntityKind::TableRow {
701 table: table.into(),
702 row_id,
703 },
704 EntityData::Row(RowData::new(columns)),
705 )
706 }
707
708 pub fn graph_node(
710 id: EntityId,
711 label: impl Into<String>,
712 node_type: impl Into<String>,
713 properties: HashMap<String, Value>,
714 ) -> Self {
715 Self::new(
716 id,
717 EntityKind::GraphNode(Box::new(GraphNodeKind {
718 label: label.into(),
719 node_type: node_type.into(),
720 })),
721 EntityData::Node(NodeData::with_properties(properties)),
722 )
723 }
724
725 pub fn graph_edge(
727 id: EntityId,
728 label: impl Into<String>,
729 from: impl Into<String>,
730 to: impl Into<String>,
731 weight: f32,
732 properties: HashMap<String, Value>,
733 ) -> Self {
734 Self::new(
735 id,
736 EntityKind::GraphEdge(Box::new(GraphEdgeKind {
737 label: label.into(),
738 from_node: from.into(),
739 to_node: to.into(),
740 weight: (weight * 1000.0) as u32,
741 })),
742 EntityData::Edge(EdgeData::with_properties(weight, properties)),
743 )
744 }
745
746 pub fn vector(id: EntityId, collection: impl Into<String>, vector: Vec<f32>) -> Self {
748 Self::new(
749 id,
750 EntityKind::Vector {
751 collection: collection.into(),
752 },
753 EntityData::Vector(VectorData::new(vector)),
754 )
755 }
756
757 pub fn add_embedding(&mut self, slot: EmbeddingSlot) {
759 self.embeddings_mut().push(slot);
760 self.touch();
761 }
762
763 pub fn add_cross_ref(&mut self, cross_ref: CrossRef) {
765 self.cross_refs_mut().push(cross_ref);
766 self.touch();
767 }
768
769 pub fn get_embedding(&self, name: &str) -> Option<&EmbeddingSlot> {
771 self.embeddings().iter().find(|e| e.name == name)
772 }
773
774 fn touch(&mut self) {
776 self.updated_at = current_unix_secs();
777 }
778
779 pub fn is_stale(&self, max_age_secs: u64) -> bool {
781 let now = current_unix_secs();
782 now.saturating_sub(self.updated_at) > max_age_secs
783 }
784}
785
786#[derive(Debug, Clone, PartialEq)]
788pub struct CrossRef {
789 pub source: EntityId,
791 pub target: EntityId,
793 pub target_collection: String,
795 pub ref_type: RefType,
797 pub weight: f32,
799 pub created_at: u64,
801}
802
803impl CrossRef {
804 pub fn new(
806 source: EntityId,
807 target: EntityId,
808 target_collection: impl Into<String>,
809 ref_type: RefType,
810 ) -> Self {
811 Self {
812 source,
813 target,
814 target_collection: target_collection.into(),
815 ref_type,
816 weight: 1.0,
817 created_at: current_unix_secs(),
818 }
819 }
820
821 pub fn with_weight(
823 source: EntityId,
824 target: EntityId,
825 target_collection: impl Into<String>,
826 ref_type: RefType,
827 weight: f32,
828 ) -> Self {
829 let mut cr = Self::new(source, target, target_collection, ref_type);
830 cr.weight = weight;
831 cr
832 }
833}
834
835#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
837pub enum RefType {
838 RowToNode, RowToEdge, NodeToRow, RowToVector, VectorToRow, NodeToVector, EdgeToVector, VectorToNode, SimilarTo, RelatedTo, DerivesFrom, Mentions, Contains, DependsOn, }
860
861impl RefType {
862 pub fn inverse(&self) -> Option<Self> {
864 match self {
865 Self::RowToNode => Some(Self::NodeToRow),
866 Self::NodeToRow => Some(Self::RowToNode),
867 Self::RowToVector => Some(Self::VectorToRow),
868 Self::VectorToRow => Some(Self::RowToVector),
869 Self::NodeToVector => Some(Self::VectorToNode),
870 Self::VectorToNode => Some(Self::NodeToVector),
871 Self::SimilarTo => Some(Self::SimilarTo), Self::RelatedTo => Some(Self::RelatedTo), _ => None, }
875 }
876
877 pub fn is_symmetric(&self) -> bool {
879 matches!(self, Self::SimilarTo | Self::RelatedTo)
880 }
881
882 pub fn to_byte(&self) -> u8 {
884 match self {
885 Self::RowToNode => 0,
886 Self::RowToEdge => 1,
887 Self::NodeToRow => 2,
888 Self::RowToVector => 3,
889 Self::VectorToRow => 4,
890 Self::NodeToVector => 5,
891 Self::EdgeToVector => 6,
892 Self::VectorToNode => 7,
893 Self::SimilarTo => 8,
894 Self::RelatedTo => 9,
895 Self::DerivesFrom => 10,
896 Self::Mentions => 11,
897 Self::Contains => 12,
898 Self::DependsOn => 13,
899 }
900 }
901
902 pub fn from_byte(byte: u8) -> Self {
904 match byte {
905 0 => Self::RowToNode,
906 1 => Self::RowToEdge,
907 2 => Self::NodeToRow,
908 3 => Self::RowToVector,
909 4 => Self::VectorToRow,
910 5 => Self::NodeToVector,
911 6 => Self::EdgeToVector,
912 7 => Self::VectorToNode,
913 8 => Self::SimilarTo,
914 9 => Self::RelatedTo,
915 10 => Self::DerivesFrom,
916 11 => Self::Mentions,
917 12 => Self::Contains,
918 13 => Self::DependsOn,
919 _ => Self::RelatedTo, }
921 }
922}
923
924impl From<Vec<Value>> for RowData {
926 fn from(columns: Vec<Value>) -> Self {
927 RowData::new(columns)
928 }
929}
930
931impl From<HashMap<String, Value>> for NodeData {
933 fn from(properties: HashMap<String, Value>) -> Self {
934 NodeData::with_properties(properties)
935 }
936}
937
938impl From<Vec<f32>> for VectorData {
940 fn from(dense: Vec<f32>) -> Self {
941 VectorData::new(dense)
942 }
943}
944
945impl From<(Vec<f32>, SparseVector)> for VectorData {
947 fn from((dense, sparse): (Vec<f32>, SparseVector)) -> Self {
948 VectorData::with_sparse(dense, sparse)
949 }
950}
951
952impl UnifiedEntity {
954 pub fn from_properties(
956 id: EntityId,
957 label: impl Into<String>,
958 node_type: impl Into<String>,
959 properties: impl IntoIterator<Item = (impl Into<String>, Value)>,
960 ) -> Self {
961 let props: HashMap<String, Value> =
962 properties.into_iter().map(|(k, v)| (k.into(), v)).collect();
963 Self::graph_node(id, label, node_type, props)
964 }
965
966 pub fn into_row(self) -> Option<RowData> {
968 match self.data {
969 EntityData::Row(r) => Some(r),
970 _ => None,
971 }
972 }
973
974 pub fn into_node(self) -> Option<NodeData> {
976 match self.data {
977 EntityData::Node(n) => Some(n),
978 _ => None,
979 }
980 }
981
982 pub fn into_edge(self) -> Option<EdgeData> {
984 match self.data {
985 EntityData::Edge(e) => Some(e),
986 _ => None,
987 }
988 }
989
990 pub fn into_vector(self) -> Option<VectorData> {
992 match self.data {
993 EntityData::Vector(v) => Some(v),
994 _ => None,
995 }
996 }
997}
998
999#[cfg(test)]
1000mod tests {
1001 use super::*;
1002
1003 #[test]
1004 fn test_entity_creation() {
1005 let id = EntityId::new(1);
1006 let entity = UnifiedEntity::table_row(
1007 id,
1008 "users",
1009 100,
1010 vec![Value::text("alice".to_string()), Value::Integer(25)],
1011 );
1012
1013 assert!(entity.data.is_row());
1014 assert_eq!(entity.kind.storage_type(), "table");
1015 assert_eq!(entity.kind.collection(), "users");
1016 }
1017
1018 #[test]
1019 fn test_cross_refs() {
1020 let id1 = EntityId::new(1);
1021 let id2 = EntityId::new(2);
1022
1023 let cross_ref = CrossRef::new(id1, id2, "nodes", RefType::RowToNode);
1024 assert_eq!(cross_ref.source, id1);
1025 assert_eq!(cross_ref.target, id2);
1026 assert_eq!(cross_ref.ref_type.inverse(), Some(RefType::NodeToRow));
1027 }
1028
1029 #[test]
1030 fn test_sparse_vector() {
1031 let sparse = SparseVector::new(vec![0, 5, 10], vec![1.0, 2.0, 3.0], 100);
1032
1033 assert_eq!(sparse.nnz(), 3);
1034 assert_eq!(sparse.get(5), 2.0);
1035 assert_eq!(sparse.get(3), 0.0);
1036 assert!(sparse.sparsity() > 0.9);
1037 }
1038
1039 #[test]
1040 fn test_embedding_slots() {
1041 let mut entity = UnifiedEntity::table_row(
1042 EntityId::new(1),
1043 "documents",
1044 1,
1045 vec![Value::text("Hello world".to_string())],
1046 );
1047
1048 entity.add_embedding(EmbeddingSlot::new(
1049 "content",
1050 vec![0.1, 0.2, 0.3],
1051 "text-embedding-3-small",
1052 ));
1053
1054 assert_eq!(entity.embeddings().len(), 1);
1055 assert!(entity.get_embedding("content").is_some());
1056 assert!(entity.get_embedding("summary").is_none());
1057 }
1058}