1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
//! V2 clustered adjacency kernel implementation
use crate::backend::native::edge_store::EdgeStore;
use crate::backend::native::node_store::NodeStore;
use crate::backend::native::types::*;
use super::Direction;
// Import instrumentation when in debug mode
#[cfg(debug_assertions)]
use super::instrumentation::convenience::{start_timing, track_v2_read};
impl super::AdjacencyIterator<'_> {
// ========================================
// V2 CLUSTERED ADJACENCY KERNEL IMPLEMENTATION
// ========================================
/// V2 clustered adjacency with proper error handling
/// Uses single clustered read and properly distinguishes between "no cluster" and "corrupt cluster"
/// FIXED: Cache both success and failure results to prevent repeated initialization attempts
pub fn try_initialize_clustered_adjacency(&mut self) -> NativeResult<()> {
// FIXED: Return early if already attempted (prevent infinite loops)
if self.cached_clustered_neighbors.is_some() {
return Ok(());
}
// First, check if node is V2 format with cluster metadata
{
#[cfg(debug_assertions)]
let _timing = start_timing("v2_cluster_metadata_check");
let node_data_offset = self.graph_file.persistent_header().node_data_offset;
let slot_offset = node_data_offset + ((self.node_id - 1) as u64 * 4096);
let mut version = [0u8; 1];
// V2-only: Check node format (V1 support removed)
match self.graph_file.read_bytes(slot_offset, &mut version) {
Ok(()) => {
#[cfg(debug_assertions)]
track_v2_read(self.node_id as u32);
if version[0] == 2 {
// V2 node detected - try to read cluster metadata
let mut node_store = NodeStore::new(self.graph_file);
match node_store.read_node_v2(self.node_id) {
Ok(node_v2) => {
drop(node_store);
let (cluster_offset, cluster_size, edge_count) =
match self.direction {
Direction::Outgoing => (
node_v2.outgoing_cluster_offset,
node_v2.outgoing_cluster_size,
node_v2.outgoing_edge_count,
),
Direction::Incoming => (
node_v2.incoming_cluster_offset,
node_v2.incoming_cluster_size,
node_v2.incoming_edge_count,
),
};
// Phase 35: Only proceed if cluster metadata is complete
if cluster_offset > 0 && cluster_size > 0 && edge_count > 0 {
#[cfg(debug_assertions)]
let _cluster_timing =
start_timing("v2_cluster_neighbor_iteration");
// Phase 69: Read V2 edge cluster directly (avoid circular dependency)
let neighbors = match self
.read_v2_edge_cluster_directly(&node_v2)
{
Ok(neighbors) => neighbors,
Err(_e) => {
// Fallback: use edge store to traverse edges directly
let mut edge_store = EdgeStore::new(self.graph_file);
edge_store
.iter_neighbors(self.node_id, self.direction)
.collect::<Vec<_>>()
}
};
// Phase 69: V2 clustered adjacency success
self.cached_clustered_neighbors = Some(neighbors);
self.total_count = edge_count;
return Ok(());
}
}
Err(NativeBackendError::InvalidNodeId { .. }) => {
// Node doesn't exist - cache empty result and propagate error
#[cfg(debug_assertions)]
track_v2_read(self.node_id as u32);
self.cached_clustered_neighbors = Some(Vec::new());
self.total_count = 0; // CRITICAL: Update total_count to match empty result
return Err(NativeBackendError::InvalidNodeId { id: 0, max_id: 0 });
}
Err(e) => {
// Phase 35: Propagate unexpected read errors, cache empty result
#[cfg(debug_assertions)]
track_v2_read(self.node_id as u32);
self.cached_clustered_neighbors = Some(Vec::new());
self.total_count = 0; // CRITICAL: Update total_count to match empty result
return Err(e);
}
}
}
}
Err(NativeBackendError::FileTooSmall { .. }) => {
// Node slot out of bounds - cache empty result and return error
self.cached_clustered_neighbors = Some(Vec::new());
self.total_count = 0; // CRITICAL: Update total_count to match empty result
return Err(NativeBackendError::FileTooSmall {
size: 0,
min_size: 1,
});
}
Err(e) => {
// Phase 35: Propagate unexpected I/O errors, cache empty result
self.cached_clustered_neighbors = Some(Vec::new());
self.total_count = 0; // CRITICAL: Update total_count to match empty result
return Err(e);
}
}
}
// V2-ONLY: Return error if V2 cluster not found
// FIXED: Cache empty result to prevent repeated initialization attempts
let error = NativeBackendError::CorruptNodeRecord {
node_id: self.node_id as i64,
reason: "V2 cluster metadata not found".to_string(),
};
self.cached_clustered_neighbors = Some(Vec::new()); // Cache empty result
self.total_count = 0; // CRITICAL: Update total_count to match empty result
Err(error)
}
/// Read V2 edge cluster directly without going through AdjacencyIterator
/// This avoids the circular dependency where AdjacencyIterator calls edge_store.iter_neighbors()
/// which creates another AdjacencyIterator
fn read_v2_edge_cluster_directly(
&mut self,
node_v2: &crate::backend::native::v2::node_record_v2::NodeRecordV2,
) -> NativeResult<Vec<NativeNodeId>> {
use crate::backend::native::v2::edge_cluster::EdgeCluster;
let (cluster_offset, cluster_size) = match self.direction {
Direction::Outgoing => (
node_v2.outgoing_cluster_offset,
node_v2.outgoing_cluster_size,
),
Direction::Incoming => (
node_v2.incoming_cluster_offset,
node_v2.incoming_cluster_size,
),
};
// If no cluster metadata, return empty result
if cluster_offset == 0 || cluster_size == 0 {
return Ok(Vec::new());
}
// Read cluster data directly from file
let mut cluster_data = vec![0u8; cluster_size as usize];
self.graph_file
.read_bytes(cluster_offset, &mut cluster_data)?;
// Check if cluster data contains actual data (not all zeros)
if cluster_data.iter().all(|&byte| byte == 0) {
return Ok(Vec::new());
}
// Try to deserialize the cluster
match EdgeCluster::deserialize(&cluster_data) {
Ok(cluster) => {
let neighbors: Vec<NativeNodeId> = cluster
.iter_neighbors()
.map(|id| id as NativeNodeId)
.collect();
Ok(neighbors)
}
Err(e) => {
// This can happen if cluster metadata exists but no actual cluster was written
// Fall back to edge store traversal
Err(e)
}
}
}
}