Skip to main content

sqlitegraph/backend/native/adjacency/
v2_clustered.rs

1//! V2 clustered adjacency kernel implementation
2
3use crate::backend::native::edge_store::EdgeStore;
4use crate::backend::native::node_store::NodeStore;
5use crate::backend::native::types::*;
6
7use super::Direction;
8
9// Import instrumentation when in debug mode
10#[cfg(debug_assertions)]
11use super::instrumentation::convenience::{get_metrics, start_timing, track_v2_read};
12
13impl super::AdjacencyIterator<'_> {
14    // ========================================
15    // V2 CLUSTERED ADJACENCY KERNEL IMPLEMENTATION
16    // ========================================
17
18    /// V2 clustered adjacency with proper error handling
19    /// Uses single clustered read and properly distinguishes between "no cluster" and "corrupt cluster"
20    /// FIXED: Cache both success and failure results to prevent repeated initialization attempts
21    pub fn try_initialize_clustered_adjacency(&mut self) -> NativeResult<()> {
22        // FIXED: Return early if already attempted (prevent infinite loops)
23        if self.cached_clustered_neighbors.is_some() {
24            return Ok(());
25        }
26
27        // First, check if node is V2 format with cluster metadata
28        {
29            #[cfg(debug_assertions)]
30            let _timing = start_timing("v2_cluster_metadata_check");
31
32            let node_data_offset = self.graph_file.persistent_header().node_data_offset;
33            let slot_offset = node_data_offset + ((self.node_id - 1) as u64 * 4096);
34            let mut version = [0u8; 1];
35
36            // V2-only: Check node format (V1 support removed)
37            match self.graph_file.read_bytes(slot_offset, &mut version) {
38                Ok(()) => {
39                    #[cfg(debug_assertions)]
40                    track_v2_read(self.node_id as u32);
41
42                    if version[0] == 2 {
43                        // V2 node detected - try to read cluster metadata
44                        let mut node_store = NodeStore::new(self.graph_file);
45                        match node_store.read_node_v2(self.node_id) {
46                            Ok(node_v2) => {
47                                drop(node_store);
48
49                                let (cluster_offset, cluster_size, edge_count) =
50                                    match self.direction {
51                                        Direction::Outgoing => (
52                                            node_v2.outgoing_cluster_offset,
53                                            node_v2.outgoing_cluster_size,
54                                            node_v2.outgoing_edge_count,
55                                        ),
56                                        Direction::Incoming => (
57                                            node_v2.incoming_cluster_offset,
58                                            node_v2.incoming_cluster_size,
59                                            node_v2.incoming_edge_count,
60                                        ),
61                                    };
62
63                                // Phase 35: Only proceed if cluster metadata is complete
64                                if cluster_offset > 0 && cluster_size > 0 && edge_count > 0 {
65                                    #[cfg(debug_assertions)]
66                                    let _cluster_timing =
67                                        start_timing("v2_cluster_neighbor_iteration");
68
69                                    // Phase 69: Read V2 edge cluster directly (avoid circular dependency)
70                                    let neighbors = match self
71                                        .read_v2_edge_cluster_directly(&node_v2)
72                                    {
73                                        Ok(neighbors) => neighbors,
74                                        Err(e) => {
75                                            #[cfg(debug_assertions)]
76                                            println!(
77                                                "DEBUG: V2 cluster read failed for node {}: {}, falling back to edge store traversal",
78                                                self.node_id, e
79                                            );
80
81                                            // Fallback: use edge store to traverse edges directly
82                                            let mut edge_store = EdgeStore::new(self.graph_file);
83                                            edge_store
84                                                .iter_neighbors(self.node_id, self.direction)
85                                                .collect::<Vec<_>>()
86                                        }
87                                    };
88
89                                    // Phase 69: V2 clustered adjacency success
90                                    #[cfg(debug_assertions)]
91                                    {
92                                        println!(
93                                            "DEBUG: V2 clustered adjacency SUCCESS for node {} (direction: {:?}, {} neighbors)",
94                                            self.node_id,
95                                            self.direction,
96                                            neighbors.len()
97                                        );
98
99                                        // Log metrics snapshot
100                                        let metrics = get_metrics();
101                                        println!(
102                                            "DEBUG: Metrics snapshot - iterations: {}, v2_reads: {}, loop_detections: {}",
103                                            metrics.total_iterations,
104                                            metrics.total_v2_reads,
105                                            metrics.infinite_loop_detections
106                                        );
107                                    }
108                                    self.cached_clustered_neighbors = Some(neighbors);
109                                    self.total_count = edge_count;
110                                    return Ok(());
111                                }
112                            }
113                            Err(NativeBackendError::InvalidNodeId { .. }) => {
114                                // Node doesn't exist - cache empty result and propagate error
115                                #[cfg(debug_assertions)]
116                                track_v2_read(self.node_id as u32);
117
118                                self.cached_clustered_neighbors = Some(Vec::new());
119                                self.total_count = 0; // CRITICAL: Update total_count to match empty result
120                                return Err(NativeBackendError::InvalidNodeId { id: 0, max_id: 0 });
121                            }
122                            Err(e) => {
123                                // Phase 35: Propagate unexpected read errors, cache empty result
124                                #[cfg(debug_assertions)]
125                                track_v2_read(self.node_id as u32);
126
127                                self.cached_clustered_neighbors = Some(Vec::new());
128                                self.total_count = 0; // CRITICAL: Update total_count to match empty result
129                                return Err(e);
130                            }
131                        }
132                    }
133                }
134                Err(NativeBackendError::FileTooSmall { .. }) => {
135                    // Node slot out of bounds - cache empty result and return error
136                    self.cached_clustered_neighbors = Some(Vec::new());
137                    self.total_count = 0; // CRITICAL: Update total_count to match empty result
138                    return Err(NativeBackendError::FileTooSmall {
139                        size: 0,
140                        min_size: 1,
141                    });
142                }
143                Err(e) => {
144                    // Phase 35: Propagate unexpected I/O errors, cache empty result
145                    self.cached_clustered_neighbors = Some(Vec::new());
146                    self.total_count = 0; // CRITICAL: Update total_count to match empty result
147                    return Err(e);
148                }
149            }
150        }
151
152        // V2-ONLY: Return error if V2 cluster not found
153        // FIXED: Cache empty result to prevent repeated initialization attempts
154        #[cfg(debug_assertions)]
155        {
156            println!(
157                "DEBUG: V2 clustered adjacency FAILED for node {} - cluster metadata not found",
158                self.node_id
159            );
160            let metrics = get_metrics();
161            println!(
162                "DEBUG: Final metrics - iterations: {}, v2_reads: {}, loop_detections: {}",
163                metrics.total_iterations, metrics.total_v2_reads, metrics.infinite_loop_detections
164            );
165        }
166
167        let error = NativeBackendError::CorruptNodeRecord {
168            node_id: self.node_id as i64,
169            reason: "V2 cluster metadata not found".to_string(),
170        };
171        self.cached_clustered_neighbors = Some(Vec::new()); // Cache empty result
172        self.total_count = 0; // CRITICAL: Update total_count to match empty result
173        Err(error)
174    }
175
176    /// Read V2 edge cluster directly without going through AdjacencyIterator
177    /// This avoids the circular dependency where AdjacencyIterator calls edge_store.iter_neighbors()
178    /// which creates another AdjacencyIterator
179    fn read_v2_edge_cluster_directly(
180        &mut self,
181        node_v2: &crate::backend::native::v2::node_record_v2::NodeRecordV2,
182    ) -> NativeResult<Vec<NativeNodeId>> {
183        use crate::backend::native::v2::edge_cluster::EdgeCluster;
184
185        let (cluster_offset, cluster_size) = match self.direction {
186            Direction::Outgoing => (
187                node_v2.outgoing_cluster_offset,
188                node_v2.outgoing_cluster_size,
189            ),
190            Direction::Incoming => (
191                node_v2.incoming_cluster_offset,
192                node_v2.incoming_cluster_size,
193            ),
194        };
195
196        // If no cluster metadata, return empty result
197        if cluster_offset == 0 || cluster_size == 0 {
198            return Ok(Vec::new());
199        }
200
201        #[cfg(debug_assertions)]
202        println!(
203            "DEBUG: Reading V2 cluster for node {} at offset {}, size {}",
204            self.node_id, cluster_offset, cluster_size
205        );
206
207        // Read cluster data directly from file
208        let mut cluster_data = vec![0u8; cluster_size as usize];
209        self.graph_file
210            .read_bytes(cluster_offset, &mut cluster_data)?;
211
212        // Check if cluster data contains actual data (not all zeros)
213        if cluster_data.iter().all(|&byte| byte == 0) {
214            #[cfg(debug_assertions)]
215            println!("DEBUG: V2 cluster data is all zeros - no edge cluster was written");
216            return Ok(Vec::new());
217        }
218
219        // Try to deserialize the cluster
220        match EdgeCluster::deserialize(&cluster_data) {
221            Ok(cluster) => {
222                let neighbors: Vec<NativeNodeId> = cluster
223                    .iter_neighbors()
224                    .map(|id| id as NativeNodeId)
225                    .collect();
226
227                #[cfg(debug_assertions)]
228                {
229                    println!(
230                        "DEBUG: Direct V2 cluster read for node {} (direction: {:?}) - found {} neighbors",
231                        self.node_id,
232                        self.direction,
233                        neighbors.len()
234                    );
235                }
236
237                Ok(neighbors)
238            }
239            Err(e) => {
240                #[cfg(debug_assertions)]
241                println!(
242                    "DEBUG: Failed to deserialize V2 cluster for node {}: {}",
243                    self.node_id, e
244                );
245
246                // This can happen if cluster metadata exists but no actual cluster was written
247                // Fall back to edge store traversal
248                Err(e)
249            }
250        }
251    }
252}