use crate::backend::native::graph_file::GraphFile;
use crate::backend::native::node_store::NodeStore;
use crate::backend::native::types::{NativeNodeId, NativeResult};
use crate::backend::native::v2::node_record_v2::NodeRecordV2;
use ahash::AHashMap;
pub struct SequentialReadBuffer {
slots: AHashMap<NativeNodeId, NodeRecordV2>,
cluster_cache: AHashMap<u64, Vec<u8>>,
prefetch_window: usize,
next_prefetch_start: Option<NativeNodeId>,
}
impl SequentialReadBuffer {
pub fn new() -> Self {
Self {
slots: AHashMap::new(),
cluster_cache: AHashMap::new(),
prefetch_window: 8, next_prefetch_start: None,
}
}
pub fn with_prefetch_window(prefetch_window: usize) -> Self {
Self {
slots: AHashMap::new(),
cluster_cache: AHashMap::new(),
prefetch_window,
next_prefetch_start: None,
}
}
#[inline]
pub fn get(&self, node_id: NativeNodeId) -> Option<&NodeRecordV2> {
self.slots.get(&node_id)
}
#[inline]
pub fn contains(&self, node_id: NativeNodeId) -> bool {
self.slots.contains_key(&node_id)
}
#[inline]
pub fn len(&self) -> usize {
self.slots.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.slots.is_empty()
}
pub fn insert_batch(&mut self, nodes: Vec<NodeRecordV2>) {
for node in nodes {
self.slots.insert(node.id, node);
}
}
pub fn insert(&mut self, node: NodeRecordV2) {
self.slots.insert(node.id, node);
}
pub fn prefetch_from(
&mut self,
graph_file: &mut GraphFile,
start_node_id: NativeNodeId,
) -> NativeResult<()> {
let mut node_store = NodeStore::new(graph_file);
let nodes = node_store.read_slots_batch(start_node_id, self.prefetch_window)?;
self.insert_batch(nodes);
self.next_prefetch_start = Some(start_node_id + self.prefetch_window as i64);
Ok(())
}
pub fn prefetch_clusters_from(
&mut self,
graph_file: &mut GraphFile,
start_node_id: NativeNodeId,
) -> NativeResult<()> {
self.prefetch_from(graph_file, start_node_id)?;
let mut cluster_reads: Vec<(u64, u32)> = Vec::new();
for (_node_id, node_record) in self.slots.iter() {
if node_record.outgoing_cluster_offset > 0 && node_record.outgoing_cluster_size > 0 {
cluster_reads.push((
node_record.outgoing_cluster_offset,
node_record.outgoing_cluster_size,
));
}
if node_record.incoming_cluster_offset > 0 && node_record.incoming_cluster_size > 0 {
cluster_reads.push((
node_record.incoming_cluster_offset,
node_record.incoming_cluster_size,
));
}
}
for (cluster_offset, cluster_size) in cluster_reads {
if self.cluster_cache.contains_key(&cluster_offset) {
continue;
}
let mut cluster_data = vec![0u8; cluster_size as usize];
if graph_file
.read_bytes(cluster_offset, &mut cluster_data)
.is_ok()
{
self.cluster_cache.insert(cluster_offset, cluster_data);
}
}
Ok(())
}
#[inline]
pub fn get_cluster(&self, cluster_offset: u64) -> Option<&[u8]> {
self.cluster_cache
.get(&cluster_offset)
.map(|v| v.as_slice())
}
#[inline]
pub fn has_cluster(&self, cluster_offset: u64) -> bool {
self.cluster_cache.contains_key(&cluster_offset)
}
#[inline]
pub fn cluster_cache_len(&self) -> usize {
self.cluster_cache.len()
}
pub fn next_prefetch_start(&self) -> Option<NativeNodeId> {
self.next_prefetch_start
}
pub fn prefetch_window(&self) -> usize {
self.prefetch_window
}
pub fn clear(&mut self) {
self.slots.clear();
self.cluster_cache.clear();
self.next_prefetch_start = None;
}
}
impl Default for SequentialReadBuffer {
#[inline]
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::backend::native::NodeFlags;
#[test]
fn test_buffer_new() {
let buffer = SequentialReadBuffer::new();
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert!(!buffer.contains(1));
assert_eq!(buffer.prefetch_window(), 8);
assert!(buffer.next_prefetch_start().is_none());
}
#[test]
fn test_buffer_default() {
let buffer = SequentialReadBuffer::default();
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert_eq!(buffer.prefetch_window(), 8);
}
#[test]
fn test_buffer_insert_get() {
let mut buffer = SequentialReadBuffer::new();
let node = NodeRecordV2::new(1, "Test".into(), "node1".into(), serde_json::json!({}));
buffer.insert(node);
assert!(buffer.contains(1));
assert_eq!(buffer.len(), 1);
assert!(!buffer.is_empty());
let retrieved = buffer.get(1);
assert!(retrieved.is_some());
assert_eq!(retrieved.unwrap().id, 1);
assert!(buffer.get(999).is_none());
assert!(!buffer.contains(999));
}
#[test]
fn test_buffer_insert_batch() {
let mut buffer = SequentialReadBuffer::new();
let nodes = vec![
NodeRecordV2::new(1, "Type1".into(), "node1".into(), serde_json::json!({})),
NodeRecordV2::new(2, "Type2".into(), "node2".into(), serde_json::json!({})),
NodeRecordV2::new(3, "Type3".into(), "node3".into(), serde_json::json!({})),
];
buffer.insert_batch(nodes);
assert_eq!(buffer.len(), 3);
assert!(buffer.contains(1));
assert!(buffer.contains(2));
assert!(buffer.contains(3));
}
#[test]
fn test_buffer_custom_window() {
let buffer = SequentialReadBuffer::with_prefetch_window(4);
assert_eq!(buffer.prefetch_window(), 4);
assert_eq!(buffer.len(), 0);
}
#[test]
fn test_buffer_clear() {
let mut buffer = SequentialReadBuffer::new();
buffer.insert(NodeRecordV2::new(
1,
"Test".into(),
"node1".into(),
serde_json::json!({}),
));
buffer.insert(NodeRecordV2::new(
2,
"Test".into(),
"node2".into(),
serde_json::json!({}),
));
assert_eq!(buffer.len(), 2);
buffer.clear();
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
assert!(!buffer.contains(1));
assert!(buffer.next_prefetch_start().is_none());
assert_eq!(buffer.cluster_cache_len(), 0);
}
#[test]
fn test_buffer_get_returns_reference() {
let mut buffer = SequentialReadBuffer::new();
let data = serde_json::json!({"key": "value"});
let node = NodeRecordV2::new(1, "Test".into(), "node1".into(), data.clone());
buffer.insert(node);
let retrieved = buffer.get(1).unwrap();
assert_eq!(retrieved.id, 1);
assert_eq!(retrieved.kind, "Test");
assert_eq!(retrieved.name, "node1");
assert_eq!(retrieved.data, data);
}
#[test]
fn test_buffer_overwrite() {
let mut buffer = SequentialReadBuffer::new();
buffer.insert(NodeRecordV2::new(
1,
"Type1".into(),
"node1".into(),
serde_json::json!({}),
));
assert_eq!(buffer.len(), 1);
buffer.insert(NodeRecordV2::new(
1,
"Type2".into(),
"node1_v2".into(),
serde_json::json!({}),
));
assert_eq!(buffer.len(), 1);
let retrieved = buffer.get(1).unwrap();
assert_eq!(retrieved.kind, "Type2");
assert_eq!(retrieved.name, "node1_v2");
}
#[test]
fn test_buffer_empty_behavior() {
let buffer = SequentialReadBuffer::new();
assert!(buffer.get(1).is_none());
assert!(buffer.get(999).is_none());
assert!(!buffer.contains(1));
assert_eq!(buffer.len(), 0);
assert!(buffer.is_empty());
}
#[test]
fn test_buffer_mvcc_isolation() {
let mut buffer1 = SequentialReadBuffer::new();
let mut buffer2 = SequentialReadBuffer::new();
let node1 = NodeRecordV2::new(1, "T".into(), "a".into(), serde_json::json!({}));
let node2 = NodeRecordV2::new(2, "T".into(), "b".into(), serde_json::json!({}));
buffer1.insert(node1);
assert!(!buffer2.contains(1));
assert_eq!(buffer2.len(), 0);
buffer2.insert(node2);
assert!(buffer1.contains(1));
assert!(!buffer1.contains(2));
assert!(!buffer2.contains(1));
assert!(buffer2.contains(2));
}
#[test]
fn test_cluster_cache_get_and_has() {
let buffer = SequentialReadBuffer::new();
assert!(!buffer.has_cluster(100));
assert!(buffer.get_cluster(100).is_none());
assert_eq!(buffer.cluster_cache_len(), 0);
assert_eq!(buffer.cluster_cache_len(), 0);
}
#[test]
fn test_cluster_cache_clear() {
let mut buffer = SequentialReadBuffer::new();
buffer.insert(NodeRecordV2::new(
1,
"Test".into(),
"node1".into(),
serde_json::json!({}),
));
assert_eq!(buffer.cluster_cache_len(), 0);
buffer.clear();
assert_eq!(buffer.cluster_cache_len(), 0);
}
#[test]
fn test_buffer_new_includes_cluster_cache() {
let buffer = SequentialReadBuffer::new();
assert_eq!(buffer.cluster_cache_len(), 0);
assert!(!buffer.has_cluster(0));
assert!(!buffer.has_cluster(999));
}
#[test]
fn test_buffer_with_custom_window_includes_cluster_cache() {
let buffer = SequentialReadBuffer::with_prefetch_window(16);
assert_eq!(buffer.cluster_cache_len(), 0);
assert_eq!(buffer.prefetch_window(), 16);
}
}