sqlitegraph/backend/native/adjacency/v2_clustered.rs
1//! V2 clustered adjacency kernel implementation
2
3use crate::backend::native::edge_store::EdgeStore;
4use crate::backend::native::node_store::NodeStore;
5use crate::backend::native::types::*;
6
7use super::Direction;
8
9// Import instrumentation when in debug mode
10#[cfg(debug_assertions)]
11use super::instrumentation::convenience::{get_metrics, start_timing, track_v2_read};
12
13impl super::AdjacencyIterator<'_> {
14 // ========================================
15 // V2 CLUSTERED ADJACENCY KERNEL IMPLEMENTATION
16 // ========================================
17
18 /// V2 clustered adjacency with proper error handling
19 /// Uses single clustered read and properly distinguishes between "no cluster" and "corrupt cluster"
20 /// FIXED: Cache both success and failure results to prevent repeated initialization attempts
21 pub fn try_initialize_clustered_adjacency(&mut self) -> NativeResult<()> {
22 // FIXED: Return early if already attempted (prevent infinite loops)
23 if self.cached_clustered_neighbors.is_some() {
24 return Ok(());
25 }
26
27 // First, check if node is V2 format with cluster metadata
28 {
29 #[cfg(debug_assertions)]
30 let _timing = start_timing("v2_cluster_metadata_check");
31
32 let node_data_offset = self.graph_file.persistent_header().node_data_offset;
33 let slot_offset = node_data_offset + ((self.node_id - 1) as u64 * 4096);
34 let mut version = [0u8; 1];
35
36 // V2-only: Check node format (V1 support removed)
37 match self.graph_file.read_bytes(slot_offset, &mut version) {
38 Ok(()) => {
39 #[cfg(debug_assertions)]
40 track_v2_read(self.node_id as u32);
41
42 if version[0] == 2 {
43 // V2 node detected - try to read cluster metadata
44 let mut node_store = NodeStore::new(self.graph_file);
45 match node_store.read_node_v2(self.node_id) {
46 Ok(node_v2) => {
47 drop(node_store);
48
49 let (cluster_offset, cluster_size, edge_count) =
50 match self.direction {
51 Direction::Outgoing => (
52 node_v2.outgoing_cluster_offset,
53 node_v2.outgoing_cluster_size,
54 node_v2.outgoing_edge_count,
55 ),
56 Direction::Incoming => (
57 node_v2.incoming_cluster_offset,
58 node_v2.incoming_cluster_size,
59 node_v2.incoming_edge_count,
60 ),
61 };
62
63 // Phase 35: Only proceed if cluster metadata is complete
64 if cluster_offset > 0 && cluster_size > 0 && edge_count > 0 {
65 #[cfg(debug_assertions)]
66 let _cluster_timing =
67 start_timing("v2_cluster_neighbor_iteration");
68
69 // Phase 69: Read V2 edge cluster directly (avoid circular dependency)
70 let neighbors = match self
71 .read_v2_edge_cluster_directly(&node_v2)
72 {
73 Ok(neighbors) => neighbors,
74 Err(e) => {
75 #[cfg(debug_assertions)]
76 println!(
77 "DEBUG: V2 cluster read failed for node {}: {}, falling back to edge store traversal",
78 self.node_id, e
79 );
80
81 // Fallback: use edge store to traverse edges directly
82 let mut edge_store = EdgeStore::new(self.graph_file);
83 edge_store
84 .iter_neighbors(self.node_id, self.direction)
85 .collect::<Vec<_>>()
86 }
87 };
88
89 // Phase 69: V2 clustered adjacency success
90 #[cfg(debug_assertions)]
91 {
92 println!(
93 "DEBUG: V2 clustered adjacency SUCCESS for node {} (direction: {:?}, {} neighbors)",
94 self.node_id,
95 self.direction,
96 neighbors.len()
97 );
98
99 // Log metrics snapshot
100 let metrics = get_metrics();
101 println!(
102 "DEBUG: Metrics snapshot - iterations: {}, v2_reads: {}, loop_detections: {}",
103 metrics.total_iterations,
104 metrics.total_v2_reads,
105 metrics.infinite_loop_detections
106 );
107 }
108 self.cached_clustered_neighbors = Some(neighbors);
109 self.total_count = edge_count;
110 return Ok(());
111 }
112 }
113 Err(NativeBackendError::InvalidNodeId { .. }) => {
114 // Node doesn't exist - cache empty result and propagate error
115 #[cfg(debug_assertions)]
116 track_v2_read(self.node_id as u32);
117
118 self.cached_clustered_neighbors = Some(Vec::new());
119 self.total_count = 0; // CRITICAL: Update total_count to match empty result
120 return Err(NativeBackendError::InvalidNodeId { id: 0, max_id: 0 });
121 }
122 Err(e) => {
123 // Phase 35: Propagate unexpected read errors, cache empty result
124 #[cfg(debug_assertions)]
125 track_v2_read(self.node_id as u32);
126
127 self.cached_clustered_neighbors = Some(Vec::new());
128 self.total_count = 0; // CRITICAL: Update total_count to match empty result
129 return Err(e);
130 }
131 }
132 }
133 }
134 Err(NativeBackendError::FileTooSmall { .. }) => {
135 // Node slot out of bounds - cache empty result and return error
136 self.cached_clustered_neighbors = Some(Vec::new());
137 self.total_count = 0; // CRITICAL: Update total_count to match empty result
138 return Err(NativeBackendError::FileTooSmall {
139 size: 0,
140 min_size: 1,
141 });
142 }
143 Err(e) => {
144 // Phase 35: Propagate unexpected I/O errors, cache empty result
145 self.cached_clustered_neighbors = Some(Vec::new());
146 self.total_count = 0; // CRITICAL: Update total_count to match empty result
147 return Err(e);
148 }
149 }
150 }
151
152 // V2-ONLY: Return error if V2 cluster not found
153 // FIXED: Cache empty result to prevent repeated initialization attempts
154 #[cfg(debug_assertions)]
155 {
156 println!(
157 "DEBUG: V2 clustered adjacency FAILED for node {} - cluster metadata not found",
158 self.node_id
159 );
160 let metrics = get_metrics();
161 println!(
162 "DEBUG: Final metrics - iterations: {}, v2_reads: {}, loop_detections: {}",
163 metrics.total_iterations, metrics.total_v2_reads, metrics.infinite_loop_detections
164 );
165 }
166
167 let error = NativeBackendError::CorruptNodeRecord {
168 node_id: self.node_id as i64,
169 reason: "V2 cluster metadata not found".to_string(),
170 };
171 self.cached_clustered_neighbors = Some(Vec::new()); // Cache empty result
172 self.total_count = 0; // CRITICAL: Update total_count to match empty result
173 Err(error)
174 }
175
176 /// Read V2 edge cluster directly without going through AdjacencyIterator
177 /// This avoids the circular dependency where AdjacencyIterator calls edge_store.iter_neighbors()
178 /// which creates another AdjacencyIterator
179 fn read_v2_edge_cluster_directly(
180 &mut self,
181 node_v2: &crate::backend::native::v2::node_record_v2::NodeRecordV2,
182 ) -> NativeResult<Vec<NativeNodeId>> {
183 use crate::backend::native::v2::edge_cluster::EdgeCluster;
184
185 let (cluster_offset, cluster_size) = match self.direction {
186 Direction::Outgoing => (
187 node_v2.outgoing_cluster_offset,
188 node_v2.outgoing_cluster_size,
189 ),
190 Direction::Incoming => (
191 node_v2.incoming_cluster_offset,
192 node_v2.incoming_cluster_size,
193 ),
194 };
195
196 // If no cluster metadata, return empty result
197 if cluster_offset == 0 || cluster_size == 0 {
198 return Ok(Vec::new());
199 }
200
201 #[cfg(debug_assertions)]
202 println!(
203 "DEBUG: Reading V2 cluster for node {} at offset {}, size {}",
204 self.node_id, cluster_offset, cluster_size
205 );
206
207 // Read cluster data directly from file
208 let mut cluster_data = vec![0u8; cluster_size as usize];
209 self.graph_file
210 .read_bytes(cluster_offset, &mut cluster_data)?;
211
212 // Check if cluster data contains actual data (not all zeros)
213 if cluster_data.iter().all(|&byte| byte == 0) {
214 #[cfg(debug_assertions)]
215 println!("DEBUG: V2 cluster data is all zeros - no edge cluster was written");
216 return Ok(Vec::new());
217 }
218
219 // Try to deserialize the cluster
220 match EdgeCluster::deserialize(&cluster_data) {
221 Ok(cluster) => {
222 let neighbors: Vec<NativeNodeId> = cluster
223 .iter_neighbors()
224 .map(|id| id as NativeNodeId)
225 .collect();
226
227 #[cfg(debug_assertions)]
228 {
229 println!(
230 "DEBUG: Direct V2 cluster read for node {} (direction: {:?}) - found {} neighbors",
231 self.node_id,
232 self.direction,
233 neighbors.len()
234 );
235 }
236
237 Ok(neighbors)
238 }
239 Err(e) => {
240 #[cfg(debug_assertions)]
241 println!(
242 "DEBUG: Failed to deserialize V2 cluster for node {}: {}",
243 self.node_id, e
244 );
245
246 // This can happen if cluster metadata exists but no actual cluster was written
247 // Fall back to edge store traversal
248 Err(e)
249 }
250 }
251 }
252}