1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
//! Core adjacency iterator implementation
use crate::backend::native::graph_file::GraphFile;
use crate::backend::native::node_store::NodeStore;
use crate::backend::native::optimizations::*;
use crate::backend::native::types::*;
use super::Direction;
// Import instrumentation when in debug mode
#[cfg(debug_assertions)]
use super::instrumentation::convenience::{
start_timing, track_iteration, validate_state,
};
#[cfg(debug_assertions)]
use log::error;
/// Adjacency iterator for efficient neighbor traversal
pub struct AdjacencyIterator<'a> {
/// Graph file reference for I/O operations
pub(crate) graph_file: &'a mut GraphFile,
/// Target node identifier for adjacency traversal
pub(crate) node_id: NativeNodeId,
/// Traversal direction (outgoing or incoming edges)
pub(crate) direction: Direction,
/// Optional edge type filter for iteration
pub(crate) edge_filter: Option<Vec<String>>,
/// Current iteration position index
pub(crate) current_index: u32,
/// Total number of neighbors available
pub(crate) total_count: u32,
/// Cached node metadata to avoid repeated deserialization
pub(crate) cached_node: Option<NodeRecord>,
/// Pre-computed edge offsets from neighbor pointer table (fast path)
pub(crate) edge_offsets: Option<Vec<FileOffset>>,
/// Hot node metadata for fast adjacency operations
pub(crate) node_hot: Option<NodeHot>,
/// V2 Clustered adjacency: cached neighbors for sequential I/O
pub(crate) cached_clustered_neighbors: Option<Vec<NativeNodeId>>,
}
impl<'a> AdjacencyIterator<'a> {
/// Create a copy of the iterator at the same position
pub fn copy_iterator(&self) -> NativeResult<Self> {
// We can't actually copy since we'd need a mutable reference to the same graph_file
// This is a limitation of the current design
Err(NativeBackendError::BufferTooSmall {
size: 0,
min_size: 1,
})
}
/// Create a new adjacency iterator for outgoing neighbors
pub fn new_outgoing(
graph_file: &'a mut GraphFile,
node_id: NativeNodeId,
) -> NativeResult<Self> {
// Try to get hot metadata first
let node_hot = get_node_hot(node_id);
// Try to get edge offsets from pointer table (fast path)
let edge_offsets = get_outgoing_edge_offsets(node_id);
// Fall back to reading the full node if needed
let (node, total_count) =
if let (Some(hot), Some(_offsets)) = (node_hot.as_ref(), edge_offsets.as_ref()) {
// Fast path: we have all the info we need
(None, hot.outgoing_edge_count)
} else {
// Slow path: read full node record
let mut node_store = NodeStore::new(graph_file);
let node = node_store.read_node(node_id)?;
let total_count = node.outgoing_edge_count;
(Some(node), total_count)
};
Ok(Self {
graph_file,
node_id,
direction: Direction::Outgoing,
edge_filter: None,
current_index: 0,
total_count,
cached_node: node,
edge_offsets,
node_hot,
cached_clustered_neighbors: None,
})
}
/// Create a new adjacency iterator for incoming neighbors
pub fn new_incoming(
graph_file: &'a mut GraphFile,
node_id: NativeNodeId,
) -> NativeResult<Self> {
// Try to get hot metadata first
let node_hot = get_node_hot(node_id);
// Try to get edge offsets from pointer table (fast path)
let edge_offsets = get_incoming_edge_offsets(node_id);
// Fall back to reading the full node if needed
let (node, total_count) =
if let (Some(hot), Some(_offsets)) = (node_hot.as_ref(), edge_offsets.as_ref()) {
// Fast path: we have all the info we need
(None, hot.incoming_edge_count)
} else {
// Slow path: read full node record
let mut node_store = NodeStore::new(graph_file);
let node = node_store.read_node(node_id)?;
let total_count = node.incoming_edge_count;
(Some(node), total_count)
};
Ok(Self {
graph_file,
node_id,
direction: Direction::Incoming,
edge_filter: None,
current_index: 0,
total_count,
cached_node: node,
edge_offsets,
node_hot,
cached_clustered_neighbors: None,
})
}
/// Set edge type filter for iteration
pub fn with_edge_filter(mut self, edge_types: &[&str]) -> Self {
self.edge_filter = Some(edge_types.iter().map(|&s| s.to_string()).collect());
self
}
/// Get the total number of neighbors (before filtering)
#[inline(always)]
pub fn total_count(&self) -> u32 {
self.total_count
}
/// Get the current iteration position
#[inline(always)]
pub fn current_index(&self) -> u32 {
self.current_index
}
/// Check if iteration is complete
#[inline(always)]
pub fn is_complete(&self) -> bool {
self.current_index >= self.total_count
}
/// Reset iterator to beginning
pub fn reset(&mut self) {
self.current_index = 0;
}
/// Get neighbor node ID at current position (optimized with pointer table and hot cache)
#[inline]
pub fn get_current_neighbor(&mut self) -> NativeResult<Option<NativeNodeId>> {
// Instrumentation: Track iterations for infinite loop detection
#[cfg(debug_assertions)]
{
if !track_iteration(self.node_id as u32) {
error!(
"Stopping iteration due to infinite loop detection for node {}",
self.node_id
);
return Ok(None);
}
}
// Validate current state consistency
#[cfg(debug_assertions)]
{
let validation_report = validate_state(
self.node_id as u32,
self.current_index,
self.total_count,
self.cached_clustered_neighbors.as_ref().map(|n| n.len()),
);
if !validation_report.is_valid() {
error!("Iterator state validation failed for node {}", self.node_id);
}
}
// V2 Clustered Adjacency Path: Use clustered neighbors if available (HIGHEST PRIORITY)
if self.cached_clustered_neighbors.is_none() {
#[cfg(debug_assertions)]
let _timing = start_timing("try_initialize_clustered_adjacency");
// EVIDENCE-BASED FIX: Ensure initialization errors are cached to prevent repeated attempts
if let Err(_) = self.try_initialize_clustered_adjacency() {
// Error has already been cached in try_initialize_clustered_adjacency()
// The cached_clustered_neighbors should now be Some(Vec::new()) with total_count = 0
}
}
if let Some(ref neighbors) = self.cached_clustered_neighbors {
let current_index = self.current_index as usize;
if current_index < neighbors.len() {
return Ok(Some(neighbors[current_index]));
}
return Ok(None);
}
// Fast path: Use pointer table if available
if let Some(edge_offsets) = self.edge_offsets.take() {
let result = self.get_current_neighbor_fast_path(&edge_offsets);
self.edge_offsets = Some(edge_offsets); // Put it back
return result;
}
// V2-only: No V1 fallback - return no neighbors if V2 clustering unavailable
Ok(None)
}
/// V2-only: Fast path disabled - use V2 clustered adjacency only
/// In V2, individual edge reading is replaced by cluster-based reading
#[inline(always)]
pub(crate) fn get_current_neighbor_fast_path(
&mut self,
_edge_offsets: &[FileOffset],
) -> NativeResult<Option<NativeNodeId>> {
// V2-only: Fast path disabled - rely on V2 clustered adjacency
// Individual edge reading is not used in V2 architecture
Ok(None)
}
/// Collect all neighbors into a vector
pub fn collect(mut self) -> NativeResult<Vec<NativeNodeId>> {
#[cfg(debug_assertions)]
let _timing = start_timing("adjacency_collect_operation");
let mut neighbors = Vec::new();
while !self.is_complete() {
match self.get_current_neighbor()? {
Some(neighbor) => {
neighbors.push(neighbor);
self.current_index += 1;
}
None => {
// Inconsistency detected - force termination
break;
}
}
}
// neighbors() must return unique neighbor IDs
// This deduplication is applied at the public API layer, preserving full edge multiplicity internally
let mut seen_neighbors = std::collections::HashSet::new();
let mut unique_neighbors = Vec::new();
for neighbor in neighbors.clone() {
if seen_neighbors.insert(neighbor) {
unique_neighbors.push(neighbor);
}
}
Ok(unique_neighbors)
}
/// Check if a specific neighbor exists
pub fn contains(&mut self, target_id: NativeNodeId) -> NativeResult<bool> {
// Store original position
let original_index = self.current_index;
// Reset to beginning
self.current_index = 0;
// Search through all neighbors
while !self.is_complete() {
if let Some(neighbor_id) = self.get_current_neighbor()? {
if neighbor_id == target_id {
// Restore original position
self.current_index = original_index;
return Ok(true);
}
}
self.current_index += 1;
}
// Restore original position
self.current_index = original_index;
Ok(false)
}
/// Get neighbors in batches
pub fn get_batch(&mut self, batch_size: u32) -> NativeResult<Vec<NativeNodeId>> {
let mut batch = Vec::with_capacity(batch_size as usize);
let end_index = (self.current_index + batch_size).min(self.total_count);
while self.current_index < end_index {
if let Some(neighbor) = self.get_current_neighbor()? {
batch.push(neighbor);
}
self.current_index += 1;
}
Ok(batch)
}
}