sqlitegraph 2.0.7

Embedded graph database with full ACID transactions, HNSW vector search, dual backend support, and comprehensive graph algorithms library
Documentation
//! Edge store modular components
//!
//! This module contains the modularized components of the edge store system,
//! broken down into focused, single-responsibility modules for better
//! maintainability and testing.

// Re-export core types from the parent module
pub use crate::backend::native::types::{EdgeFlags, EdgeRecord, NativeEdgeId};

// Import logging macro
use log::debug;

// Import modular components
pub mod capacity_coordinator;
mod cluster_utils;
pub mod id_management;
mod record_operations;
mod utils;

// Re-export utility functions
pub use cluster_utils::{
    calculate_edge_data_offset_in_cluster, calculate_neighbor_offset_in_cluster,
    calculate_optimal_cluster_size, validate_cluster_size,
};
pub use utils::check_for_overlap;

// Re-export record operations
pub use record_operations::EdgeRecordOperations;

/// Edge store manages edge records and adjacency layout in the graph file
///
/// This implementation delegates to modularized components for clean separation of concerns
/// while preserving the original API signature for compatibility.
pub struct EdgeStore<'a> {
    graph_file: &'a mut crate::backend::native::graph_file::GraphFile,
}

impl<'a> EdgeStore<'a> {
    /// Create a new edge store
    pub fn new(graph_file: &'a mut crate::backend::native::graph_file::GraphFile) -> Self {
        Self { graph_file }
    }

    /// Write an edge record to the store with V2 cluster metadata integration
    /// This is the recommended method for V2 systems as it properly updates node cluster metadata
    pub fn write_edge(
        &mut self,
        edge: &crate::backend::native::types::EdgeRecord,
    ) -> crate::backend::native::types::NativeResult<()> {
        self.write_edge_with_cluster_metadata(edge)
    }

    /// Write an edge record and update source/target node cluster metadata
    /// This method ensures proper V2 adjacency by updating cluster metadata on both nodes
    fn write_edge_with_cluster_metadata(
        &mut self,
        edge: &crate::backend::native::types::EdgeRecord,
    ) -> crate::backend::native::types::NativeResult<()> {
        // First, write the edge record itself
        let mut operations = record_operations::EdgeRecordOperations::new(self.graph_file);
        operations.write_edge(edge)?;

        // CRITICAL FIX: Update header edge_count if this edge ID exceeds current count
        // This handles manually assigned edge IDs (like in tests) that don't go through allocate_edge_id()
        let current_edge_count = self.graph_file.header().edge_count;
        if edge.id > current_edge_count as i64 {
            self.graph_file.persistent_header_mut().edge_count = edge.id as u64;
        }

        // Then update cluster metadata on source and target nodes
        self.update_node_cluster_metadata(edge.from_id, edge.to_id)
    }

    /// Update cluster metadata for both source and target nodes to establish adjacency
    /// This ensures that V2 adjacency traversal can find the edges
    fn update_node_cluster_metadata(
        &mut self,
        source_id: crate::backend::native::types::NativeNodeId,
        target_id: crate::backend::native::types::NativeNodeId,
    ) -> crate::backend::native::types::NativeResult<()> {
        use crate::backend::native::node_store::NodeStore;

        // Update source node's outgoing cluster
        {
            let mut node_store = NodeStore::new(self.graph_file);
            let mut source_node = node_store.read_node_v2(source_id)?;
            source_node.outgoing_edge_count += 1;
            // Set minimal cluster metadata if not already set
            if source_node.outgoing_cluster_offset == 0 {
                // Use offset after node region (1536) for outgoing clusters
                source_node.outgoing_cluster_offset = 1536;
                source_node.outgoing_cluster_size = 4096;
            }
            node_store.write_node_v2(&source_node)?;
            drop(node_store); // Release the borrow
        }

        // Update target node's incoming cluster
        {
            let mut node_store = NodeStore::new(self.graph_file);
            let mut target_node = node_store.read_node_v2(target_id)?;
            target_node.incoming_edge_count += 1;
            // Set minimal cluster metadata if not already set
            // IMPORTANT: Use a different offset to avoid overlap with outgoing cluster
            // The outgoing cluster is at [1536, 5632), so incoming starts at 5632
            if target_node.incoming_cluster_offset == 0 {
                target_node.incoming_cluster_offset = 1536 + 4096; // 5632 - after outgoing cluster
                target_node.incoming_cluster_size = 4096;
            }
            node_store.write_node_v2(&target_node)?;
        }

        Ok(())
    }

    /// Read an edge record from the store
    pub fn read_edge(
        &mut self,
        edge_id: crate::backend::native::types::NativeEdgeId,
    ) -> crate::backend::native::types::NativeResult<crate::backend::native::types::EdgeRecord>
    {
        let mut operations = record_operations::EdgeRecordOperations::new(self.graph_file);
        operations.read_edge(edge_id)
    }

    /// Get the maximum edge ID
    pub fn max_edge_id(&mut self) -> crate::backend::native::types::NativeEdgeId {
        let id_manager = id_management::EdgeIdManager::new(self.graph_file);
        id_manager.max_edge_id()
    }

    /// Allocate a new edge ID with capacity coordination
    pub fn allocate_edge_id(&mut self) -> crate::backend::native::types::NativeEdgeId {
        let mut coordinator = capacity_coordinator::EdgeCapacityCoordinator::new(self.graph_file);
        coordinator
            .allocate_edge_id_with_capacity()
            .expect("Failed to allocate edge ID with capacity")
    }

    /// Delete an edge by marking it as deleted (soft deletion)
    ///
    /// This marks the edge as deleted by setting a flag in the edge record.
    /// The edge record remains in storage but is marked as deleted.
    ///
    /// # Arguments
    /// * `edge_id` - The ID of the edge to delete
    ///
    /// # Returns
    /// `Ok(())` if the edge was successfully marked as deleted
    ///
    /// # Note
    /// This is a soft deletion - the edge record remains but is marked as deleted.
    /// This is reversible for rollback scenarios.
    pub fn delete_edge(
        &mut self,
        edge_id: crate::backend::native::types::NativeEdgeId,
    ) -> crate::backend::native::types::NativeResult<()> {
        let mut operations = record_operations::EdgeRecordOperations::new(self.graph_file);
        operations.delete_edge(edge_id)
    }

    /// Iterate over neighbors of a node using V2 cluster adjacency
    /// Returns node IDs that are connected to the specified node in the given direction
    /// AVOIDS CIRCULAR DEPENDENCY: Uses direct edge iteration instead of AdjacencyIterator
    pub fn iter_neighbors(
        &mut self,
        node_id: crate::backend::native::types::NativeNodeId,
        direction: crate::backend::native::adjacency::Direction,
    ) -> Box<dyn Iterator<Item = crate::backend::native::types::NativeNodeId> + '_> {
        // AVOID CIRCULAR DEPENDENCY: Don't use AdjacencyIterator which would call back into V2 cluster system
        // Instead, use direct edge record iteration
        match self.iter_neighbors_direct(node_id, direction) {
            Ok(neighbors) => Box::new(neighbors.into_iter()),
            Err(_) => Box::new(std::iter::empty()),
        }
    }

    /// Iterate edges for a node, returning edge IDs and neighbor node IDs
    ///
    /// This is similar to iter_neighbors but returns both edge_id and neighbor_id for each edge.
    /// This enables operations like edge cascade cleanup where edge IDs are needed.
    ///
    /// # Arguments
    /// * `node_id` - The node to iterate edges for
    /// * `direction` - Outgoing (edges from this node) or Incoming (edges to this node)
    ///
    /// # Returns
    /// Iterator of (edge_id, neighbor_id) tuples
    ///
    /// # Performance Note
    /// This scans all edge records in the database (1 to header.edge_count), which is O(N)
    /// where N is the total number of edges. For large graphs, consider adding an index.
    pub fn iter_edges_with_ids(
        &mut self,
        node_id: crate::backend::native::types::NativeNodeId,
        direction: crate::backend::native::adjacency::Direction,
    ) -> Box<
        dyn Iterator<
                Item = (
                    crate::backend::native::types::NativeEdgeId,
                    crate::backend::native::types::NativeNodeId,
                ),
            > + '_,
    > {
        match self.iter_edges_with_ids_direct(node_id, direction) {
            Ok(edges) => Box::new(edges.into_iter()),
            Err(_) => Box::new(std::iter::empty()),
        }
    }

    /// Direct edge neighbor iteration without going through AdjacencyIterator
    /// This prevents the circular dependency: AdjacencyIterator -> EdgeStore::iter_neighbors -> AdjacencyIterator
    /// Reads directly from legacy edge storage using the edge record operations
    fn iter_neighbors_direct(
        &mut self,
        node_id: crate::backend::native::types::NativeNodeId,
        direction: crate::backend::native::adjacency::Direction,
    ) -> crate::backend::native::types::NativeResult<Vec<crate::backend::native::types::NativeNodeId>>
    {
        use crate::backend::native::node_store::NodeStore;

        // Read V2 node to get edge count information
        let mut node_store = NodeStore::new(self.graph_file);
        let node_v2 = node_store.read_node_v2(node_id)?;
        drop(node_store);

        let edge_count = match direction {
            crate::backend::native::adjacency::Direction::Outgoing => node_v2.outgoing_edge_count,
            crate::backend::native::adjacency::Direction::Incoming => node_v2.incoming_edge_count,
        };

        if edge_count == 0 {
            return Ok(Vec::new());
        }

        // Read edges directly from legacy edge storage by scanning all edges
        let header = self.graph_file.header();
        let mut neighbors = Vec::new();

        for edge_id in 1..=header.edge_count as i64 {
            let mut operations = record_operations::EdgeRecordOperations::new(self.graph_file);
            if let Ok(edge) = operations.read_edge(edge_id) {
                let matches_direction = match direction {
                    crate::backend::native::adjacency::Direction::Outgoing => {
                        edge.from_id == node_id
                    }
                    crate::backend::native::adjacency::Direction::Incoming => edge.to_id == node_id,
                };

                if matches_direction {
                    let neighbor_id = match direction {
                        crate::backend::native::adjacency::Direction::Outgoing => edge.to_id,
                        crate::backend::native::adjacency::Direction::Incoming => edge.from_id,
                    };
                    neighbors.push(neighbor_id);
                }
            }
        }

        Ok(neighbors)
    }

    /// Direct edge iteration with IDs, returning (edge_id, neighbor_id) tuples
    /// Similar to iter_neighbors_direct but includes edge IDs for operations like cascade cleanup
    fn iter_edges_with_ids_direct(
        &mut self,
        node_id: crate::backend::native::types::NativeNodeId,
        direction: crate::backend::native::adjacency::Direction,
    ) -> crate::backend::native::types::NativeResult<
        Vec<(
            crate::backend::native::types::NativeEdgeId,
            crate::backend::native::types::NativeNodeId,
        )>,
    > {
        use crate::backend::native::node_store::NodeStore;

        // Read V2 node to get edge count information
        let mut node_store = NodeStore::new(self.graph_file);
        let node_v2 = node_store.read_node_v2(node_id)?;
        drop(node_store);

        let edge_count = match direction {
            crate::backend::native::adjacency::Direction::Outgoing => node_v2.outgoing_edge_count,
            crate::backend::native::adjacency::Direction::Incoming => node_v2.incoming_edge_count,
        };

        if edge_count == 0 {
            return Ok(Vec::new());
        }

        debug!(
            "Direct edge iteration with IDs for node {} (direction: {:?}) - {} edges expected",
            node_id, direction, edge_count
        );

        // Read edges directly from legacy edge storage by scanning all edges
        let header = self.graph_file.header();
        let mut edges = Vec::new();

        for edge_id in 1..=header.edge_count as i64 {
            let mut operations = record_operations::EdgeRecordOperations::new(self.graph_file);
            if let Ok(edge) = operations.read_edge(edge_id) {
                let matches_direction = match direction {
                    crate::backend::native::adjacency::Direction::Outgoing => {
                        edge.from_id == node_id
                    }
                    crate::backend::native::adjacency::Direction::Incoming => edge.to_id == node_id,
                };

                if matches_direction {
                    let neighbor_id = match direction {
                        crate::backend::native::adjacency::Direction::Outgoing => edge.to_id,
                        crate::backend::native::adjacency::Direction::Incoming => edge.from_id,
                    };
                    edges.push((edge_id, neighbor_id));
                }
            }
        }

        debug!(
            "Direct edge iteration with IDs found {} edges for node {} (direction: {:?})",
            edges.len(),
            node_id,
            direction
        );

        Ok(edges)
    }

    /// Allocate adjacency space for outgoing edges
    pub fn allocate_outgoing_adjacency(
        &mut self,
        node_id: crate::backend::native::types::NativeNodeId,
        count: u32,
    ) -> crate::backend::native::types::NativeResult<crate::backend::native::types::FileOffset>
    {
        let mut allocator = id_management::AdjacencyAllocator::new(self.graph_file);
        allocator.allocate_outgoing_adjacency(node_id, count)
    }

    /// Allocate adjacency space for incoming edges
    pub fn allocate_incoming_adjacency(
        &mut self,
        node_id: crate::backend::native::types::NativeNodeId,
        count: u32,
    ) -> crate::backend::native::types::NativeResult<crate::backend::native::types::FileOffset>
    {
        let mut allocator = id_management::AdjacencyAllocator::new(self.graph_file);
        allocator.allocate_incoming_adjacency(node_id, count)
    }
}