velesdb-core 1.13.2

High-performance vector database engine written in Rust
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
//! Graph edge types and storage for knowledge graph relationships.
//!
//! This module provides:
//! - `GraphEdge`: A typed relationship between nodes with properties
//! - `EdgeStore`: Bidirectional index for efficient edge traversal
//!
//! CSR snapshot types are in [`super::csr_snapshot`].
//!
//! # Edge Removal Semantics
//!
//! During edge removal, the internal indexes may be temporarily inconsistent
//! while the operation is in progress. The final state is always consistent.
//! For concurrent access, use `ConcurrentEdgeStore` instead.

use super::csr_snapshot::CsrSnapshot;
use crate::error::{Error, Result};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

/// A directed edge (relationship) in the knowledge graph.
///
/// Edges connect nodes and can have a label (type) and properties.
///
/// # Example
///
/// ```rust,ignore
/// use velesdb_core::collection::graph::GraphEdge;
/// use serde_json::json;
/// use std::collections::HashMap;
///
/// let mut props = HashMap::new();
/// props.insert("since".to_string(), json!("2020-01-01"));
///
/// let edge = GraphEdge::new(1, 100, 200, "KNOWS")
///     .with_properties(props);
/// ```
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GraphEdge {
    id: u64,
    source: u64,
    target: u64,
    label: String,
    properties: HashMap<String, Value>,
}

impl GraphEdge {
    /// Creates a new edge with the given ID, endpoints, and label.
    ///
    /// # Errors
    ///
    /// Returns `Error::InvalidEdgeLabel` if the label is empty or whitespace-only.
    pub fn new(id: u64, source: u64, target: u64, label: &str) -> Result<Self> {
        let trimmed = label.trim();
        if trimmed.is_empty() {
            return Err(Error::InvalidEdgeLabel(
                "Edge label cannot be empty or whitespace-only".to_string(),
            ));
        }
        Ok(Self {
            id,
            source,
            target,
            label: trimmed.to_string(),
            properties: HashMap::new(),
        })
    }

    /// Adds properties to this edge (builder pattern).
    #[must_use]
    pub fn with_properties(mut self, properties: HashMap<String, Value>) -> Self {
        self.properties = properties;
        self
    }

    /// Returns the edge ID.
    #[must_use]
    pub fn id(&self) -> u64 {
        self.id
    }

    /// Returns the source node ID.
    #[must_use]
    pub fn source(&self) -> u64 {
        self.source
    }

    /// Returns the target node ID.
    #[must_use]
    pub fn target(&self) -> u64 {
        self.target
    }

    /// Returns the edge label (relationship type).
    #[must_use]
    pub fn label(&self) -> &str {
        &self.label
    }

    /// Returns all properties of this edge.
    #[must_use]
    pub fn properties(&self) -> &HashMap<String, Value> {
        &self.properties
    }

    /// Returns a specific property value, if it exists.
    #[must_use]
    pub fn property(&self, name: &str) -> Option<&Value> {
        self.properties.get(name)
    }
}

/// Storage for graph edges with bidirectional indexing.
///
/// Provides O(1) access to edges by ID and O(degree) access to
/// outgoing/incoming edges for any node.
///
/// # Index Structure (EPIC-019 US-003)
///
/// - `by_label`: Secondary index for O(k) label-based queries
/// - `outgoing_by_label`: Composite index (source, label) for O(k) filtered traversal
///
/// # CSR Snapshot (G1)
///
/// After loading from disk or after explicit `build_read_snapshot()`, the
/// `csr_snapshot` field provides zero-copy `&[u64]` access to neighbor
/// target IDs and edge IDs. Writes invalidate the snapshot automatically.
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct EdgeStore {
    /// All edges indexed by ID
    pub(super) edges: HashMap<u64, GraphEdge>,
    /// Outgoing edges: source_id -> Vec<edge_id>
    pub(super) outgoing: HashMap<u64, Vec<u64>>,
    /// Incoming edges: target_id -> Vec<edge_id>
    pub(super) incoming: HashMap<u64, Vec<u64>>,
    /// Secondary index: label -> Vec<edge_id> for fast label queries
    pub(super) by_label: HashMap<String, Vec<u64>>,
    /// Composite index: (source_id, label) -> Vec<edge_id> for fast filtered traversal
    pub(super) outgoing_by_label: HashMap<(u64, String), Vec<u64>>,
    /// Zero-copy CSR snapshot for BFS traversal (G1).
    /// Built on-demand via `build_read_snapshot()`, invalidated by writes.
    #[serde(skip)]
    pub(super) csr_snapshot: Option<CsrSnapshot>,
}

impl EdgeStore {
    /// Creates a new empty edge store.
    #[must_use]
    pub fn new() -> Self {
        Self::default()
    }

    /// Creates an edge store with pre-allocated capacity for better performance.
    ///
    /// Pre-allocating reduces memory reallocation overhead when inserting many edges.
    /// With 10M edges, this can reduce peak memory usage by ~2x and improve insert throughput.
    ///
    /// # Arguments
    ///
    /// * `expected_edges` - Expected number of edges to store
    /// * `expected_nodes` - Expected number of unique nodes (sources + targets)
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// // For a graph with ~1M edges and ~100K nodes
    /// let store = EdgeStore::with_capacity(1_000_000, 100_000);
    /// ```
    #[must_use]
    pub fn with_capacity(expected_edges: usize, expected_nodes: usize) -> Self {
        // Estimate ~10 unique labels typical for knowledge graphs
        let expected_labels = 10usize;
        // Use saturating_mul to prevent overflow for extreme inputs
        let outgoing_by_label_cap = expected_nodes
            .saturating_mul(expected_labels)
            .saturating_div(10);
        Self {
            edges: HashMap::with_capacity(expected_edges),
            outgoing: HashMap::with_capacity(expected_nodes),
            incoming: HashMap::with_capacity(expected_nodes),
            by_label: HashMap::with_capacity(expected_labels),
            outgoing_by_label: HashMap::with_capacity(outgoing_by_label_cap),
            csr_snapshot: None,
        }
    }

    /// Adds an edge to the store.
    ///
    /// Creates bidirectional index entries for efficient traversal.
    /// Also maintains label-based secondary indices (EPIC-019 US-003).
    ///
    /// # Errors
    ///
    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
    pub fn add_edge(&mut self, edge: GraphEdge) -> Result<()> {
        self.insert_edge(edge, true, true)
    }

    /// Adds an edge with only the outgoing index (for cross-shard storage).
    ///
    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
    /// The edge is stored and indexed by source node only.
    ///
    /// # Errors
    ///
    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
    pub fn add_edge_outgoing_only(&mut self, edge: GraphEdge) -> Result<()> {
        self.insert_edge(edge, true, false)
    }

    /// Adds an edge with only the incoming index (for cross-shard storage).
    ///
    /// Used by `ConcurrentEdgeStore` when source and target are in different shards.
    /// The edge is stored and indexed by target node only.
    /// Note: Label indices are maintained by the source shard in `ConcurrentEdgeStore`.
    ///
    /// # Errors
    ///
    /// Returns `Error::EdgeExists` if an edge with the same ID already exists.
    pub fn add_edge_incoming_only(&mut self, edge: GraphEdge) -> Result<()> {
        self.insert_edge(edge, false, true)
    }

    /// Shared implementation for all `add_edge*` variants.
    ///
    /// Validates uniqueness, populates the requested directional indices,
    /// and stores the edge. Label indices (`by_label`, `outgoing_by_label`)
    /// are maintained only when `index_outgoing` is `true` (source shard
    /// owns label indices in the concurrent model).
    fn insert_edge(
        &mut self,
        edge: GraphEdge,
        index_outgoing: bool,
        index_incoming: bool,
    ) -> Result<()> {
        let id = edge.id();
        if self.edges.contains_key(&id) {
            return Err(Error::EdgeExists(id));
        }

        if index_outgoing {
            let source = edge.source();
            let label = edge.label().to_string();
            self.outgoing.entry(source).or_default().push(id);
            // Label indices are owned by the source shard (US-003)
            self.by_label.entry(label.clone()).or_default().push(id);
            self.outgoing_by_label
                .entry((source, label))
                .or_default()
                .push(id);
        }

        if index_incoming {
            self.incoming.entry(edge.target()).or_default().push(id);
        }

        self.edges.insert(id, edge);
        // Invalidate CSR snapshot — writes make it stale (G1).
        self.csr_snapshot = None;
        Ok(())
    }

    /// Returns the total number of edges in the store.
    #[must_use]
    pub fn edge_count(&self) -> usize {
        self.edges.len()
    }

    /// Returns the count of edges where this shard is the source (for accurate cross-shard counting).
    #[must_use]
    pub fn outgoing_edge_count(&self) -> usize {
        self.outgoing.values().map(Vec::len).sum()
    }

    /// Gets an edge by its ID.
    #[must_use]
    pub fn get_edge(&self, id: u64) -> Option<&GraphEdge> {
        self.edges.get(&id)
    }

    /// Gets all outgoing edges from a node.
    #[must_use]
    pub fn get_outgoing(&self, node_id: u64) -> Vec<&GraphEdge> {
        self.resolve_edge_ids(self.outgoing.get(&node_id))
    }

    /// Invokes `f` for each outgoing edge from `node_id` without allocating a `Vec`.
    ///
    /// Prefer this over [`get_outgoing`](Self::get_outgoing) in hot loops (e.g. BFS
    /// frontiers) where the caller processes edges inline rather than collecting them.
    #[inline]
    pub fn for_each_outgoing<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
        if let Some(ids) = self.outgoing.get(&node_id) {
            for id in ids {
                if let Some(edge) = self.edges.get(id) {
                    f(edge);
                }
            }
        }
    }

    /// Returns the number of outgoing edges from `node_id` without materializing them.
    #[must_use]
    #[inline]
    pub fn outgoing_degree(&self, node_id: u64) -> usize {
        self.outgoing.get(&node_id).map_or(0, Vec::len)
    }

    /// Returns the number of incoming edges to `node_id` without materializing them.
    #[must_use]
    #[inline]
    pub fn incoming_degree(&self, node_id: u64) -> usize {
        self.incoming.get(&node_id).map_or(0, Vec::len)
    }

    /// Gets all incoming edges to a node.
    #[must_use]
    pub fn get_incoming(&self, node_id: u64) -> Vec<&GraphEdge> {
        self.resolve_edge_ids(self.incoming.get(&node_id))
    }

    /// Gets outgoing edges filtered by label using composite index - O(k) where k = result count.
    ///
    /// Uses the `outgoing_by_label` composite index for fast lookup instead of
    /// iterating through all outgoing edges (EPIC-019 US-003).
    #[must_use]
    pub fn get_outgoing_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
        self.resolve_edge_ids(self.outgoing_by_label.get(&(node_id, label.to_string())))
    }

    /// Gets all edges with a specific label - O(k) where k = result count.
    ///
    /// Uses the `by_label` secondary index for fast lookup (EPIC-019 US-003).
    #[must_use]
    pub fn get_edges_by_label(&self, label: &str) -> Vec<&GraphEdge> {
        self.resolve_edge_ids(self.by_label.get(label))
    }

    /// Resolves edge IDs from an index entry into edge references.
    ///
    /// Shared lookup pattern used by `get_outgoing`, `get_incoming`,
    /// `get_outgoing_by_label`, and `get_edges_by_label`.
    #[inline]
    fn resolve_edge_ids(&self, ids: Option<&Vec<u64>>) -> Vec<&GraphEdge> {
        ids.map(|ids| ids.iter().filter_map(|id| self.edges.get(id)).collect())
            .unwrap_or_default()
    }

    /// Gets incoming edges filtered by label.
    #[must_use]
    pub fn get_incoming_by_label(&self, node_id: u64, label: &str) -> Vec<&GraphEdge> {
        self.get_incoming(node_id)
            .into_iter()
            .filter(|e| e.label() == label)
            .collect()
    }

    /// Checks if an edge with the given ID exists.
    #[must_use]
    pub fn contains_edge(&self, edge_id: u64) -> bool {
        self.edges.contains_key(&edge_id)
    }

    /// Returns the number of edges in the store.
    #[must_use]
    pub fn len(&self) -> usize {
        self.edges.len()
    }

    /// Returns true if the store contains no edges.
    #[must_use]
    pub fn is_empty(&self) -> bool {
        self.edges.is_empty()
    }

    /// Returns all edges in the store.
    #[must_use]
    pub fn all_edges(&self) -> Vec<&GraphEdge> {
        self.edges.values().collect()
    }

    /// Returns all outgoing source node IDs (keys of the outgoing index).
    ///
    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
    /// enumerate source nodes for CSR construction.
    #[must_use]
    pub(crate) fn outgoing_keys(&self) -> Vec<u64> {
        self.outgoing.keys().copied().collect()
    }

    /// Returns the total number of outgoing edge entries across all nodes.
    ///
    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) for
    /// pre-allocation.
    #[must_use]
    pub(crate) fn total_outgoing_edges(&self) -> usize {
        self.outgoing.values().map(Vec::len).sum()
    }

    /// Invokes `f` for each outgoing edge from `node_id` (by edge object).
    ///
    /// Used by [`SnapshotBuilder`](super::csr_snapshot::SnapshotBuilder) to
    /// iterate edges without exposing internal index structure.
    pub(crate) fn for_each_outgoing_edge<F: FnMut(&GraphEdge)>(&self, node_id: u64, mut f: F) {
        if let Some(ids) = self.outgoing.get(&node_id) {
            for id in ids {
                if let Some(edge) = self.edges.get(id) {
                    f(edge);
                }
            }
        }
    }
}

// Edge removal operations are in `edge_removal.rs`.
// CSR snapshot methods and persistence are in `edge_persistence.rs`.