use anyhow::{anyhow, bail, Context, Result};
use bytemuck::{bytes_of, from_bytes, Pod, Zeroable};
use memmap2::MmapMut;
use parking_lot::{Mutex, RwLock};
use std::fs::{File, OpenOptions};
use std::io::{Read, Result as IoResult, Write};
use std::mem::size_of;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::storage::data_structures::{
EdgeRec, NodeRec, WalEntry, EDGE_FLAG_TOMBSTONE, NODE_FLAG_TOMBSTONE,
};
use crate::storage::wal::Wal;
const CURRENT_VERSION: u32 = 1;
#[repr(C)]
#[derive(Clone, Copy, Zeroable, Pod, Debug)]
pub struct NodesHeader {
pub magic: u32,
pub version: u32,
pub logical_len: u64,
pub next_id: u64,
pub reserved: [u8; 32],
}
#[repr(C)]
#[derive(Clone, Copy, Zeroable, Pod, Debug)]
pub struct EdgesHeader {
pub magic: u32,
pub version: u32,
pub logical_len: u64,
pub reserved: [u8; 32],
}
#[repr(C)]
#[derive(Clone, Copy, Zeroable, Pod, Debug)]
pub struct OctreeHeader {
pub magic: u32,
pub version: u32,
pub logical_len: u64,
pub reserved: [u8; 32],
}
#[repr(C)]
#[derive(Clone, Copy, Zeroable, Pod, Debug)]
pub struct WalHeader {
pub magic: u32,
pub version: u32,
pub logical_len: u64,
pub reserved: [u8; 32],
}
pub(crate) const SEGMENT_HEADER_LEN: usize = std::mem::size_of::<NodesHeader>();
pub(crate) const MANIFEST_MAGIC_V2: u32 = 0x4747_4d4e;
const MANIFEST_VERSION_V2: u32 = 1;
#[derive(Debug, Clone, Copy)]
pub struct CompactionStats {
pub edges_removed: u32,
pub space_reclaimed: u64,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SegmentKind {
Nodes,
Edges,
Octree,
Wal,
}
pub struct MappedFile {
pub file: File,
pub mmap: MmapMut,
pub current_len: u64,
}
impl MappedFile {
pub fn flush_all(&mut self) -> IoResult<()> {
self.mmap.flush()?;
self.file.sync_data()?;
Ok(())
}
pub fn flush_header_range(&mut self, header_len: usize) -> IoResult<()> {
self.mmap.flush_range(0, header_len)?;
self.file.sync_data()?;
Ok(())
}
}
#[repr(C)]
#[derive(Clone, Copy, Debug, Pod, Zeroable)]
pub struct StorageManifest {
pub magic: u32,
pub version: u32,
pub clean_shutdown: u8,
pub _reserved: [u8; 7],
pub nodes_len: u64,
pub edges_len: u64,
pub wal_len: u64,
pub next_node_id: u64,
pub checkpoint_lsn: u64,
}
impl Default for StorageManifest {
fn default() -> Self {
Self::new()
}
}
impl StorageManifest {
pub fn new() -> Self {
StorageManifest {
magic: MANIFEST_MAGIC_V2,
version: MANIFEST_VERSION_V2,
clean_shutdown: 0,
_reserved: [0; 7],
nodes_len: 0,
edges_len: 0,
wal_len: 0,
next_node_id: 1,
checkpoint_lsn: 0,
}
}
fn _is_clean(&self) -> bool {
self.clean_shutdown != 0
}
fn set_clean(&mut self, clean: bool) {
self.clean_shutdown = if clean { 1 } else { 0 };
}
}
pub struct GraphStorageManager {
pub nodes: RwLock<MappedFile>,
pub edges: RwLock<MappedFile>,
pub octree: RwLock<MappedFile>,
pub wal_engine: Wal,
pub manifest_path: std::path::PathBuf,
pub manifest: Mutex<StorageManifest>,
pub node_reads: AtomicU64,
pub node_writes: AtomicU64,
pub edge_reads: AtomicU64,
pub edge_writes: AtomicU64,
}
impl GraphStorageManager {
pub fn open(
base_path: impl AsRef<Path>,
initial_node_size: u64,
initial_edge_size: u64,
initial_octree_size: u64,
_initial_wal_size: u64,
) -> Result<Self> {
let base_path_buf = base_path.as_ref().to_path_buf();
std::fs::create_dir_all(&base_path_buf)?;
let nodes_path = base_path_buf.join("nodes.dat");
let edges_path = base_path_buf.join("edges.csr");
let octree_path = base_path_buf.join("octree.idx");
let wal_path = base_path_buf.join("store.wal");
let manifest_path = base_path_buf.join("manifest.meta");
let manifest = if manifest_path.exists() {
let mut file = File::open(&manifest_path)?;
let mut buf = [0u8; size_of::<StorageManifest>()];
file.read_exact(&mut buf)?;
*from_bytes::<StorageManifest>(&buf)
} else {
StorageManifest::new()
};
let nodes_file = open_segment(&nodes_path, initial_node_size, SegmentKind::Nodes)?;
let edges_file = open_segment(&edges_path, initial_edge_size, SegmentKind::Edges)?;
let octree_file = open_segment(&octree_path, initial_octree_size, SegmentKind::Octree)?;
let wal_engine = Wal::open(&wal_path, 100)?;
Ok(GraphStorageManager {
nodes: RwLock::new(nodes_file),
edges: RwLock::new(edges_file),
octree: RwLock::new(octree_file),
wal_engine,
manifest_path,
manifest: Mutex::new(manifest),
node_reads: AtomicU64::new(0),
node_writes: AtomicU64::new(0),
edge_reads: AtomicU64::new(0),
edge_writes: AtomicU64::new(0),
})
}
pub fn allocate_node_id(&self) -> u64 {
let mut manifest = self.manifest.lock();
let id = manifest.next_node_id;
manifest.next_node_id += 1;
id
}
pub fn node_count(&self) -> usize {
let nodes_lock = self.nodes.read();
let header: NodesHeader = *from_bytes(&nodes_lock.mmap[0..size_of::<NodesHeader>()]);
(header.logical_len / size_of::<NodeRec>() as u64) as usize
}
pub fn edge_count(&self) -> usize {
let edges_lock = self.edges.read();
let header: EdgesHeader = *from_bytes(&edges_lock.mmap[0..size_of::<EdgesHeader>()]);
(header.logical_len / size_of::<EdgeRec>() as u64) as usize
}
pub fn get_node_record(&self, id: u64) -> Result<NodeRec> {
self.node_reads.fetch_add(1, Ordering::Relaxed);
let nodes_lock = self.nodes.read();
let offset = SEGMENT_HEADER_LEN + id as usize * size_of::<NodeRec>();
if offset + size_of::<NodeRec>() > nodes_lock.mmap.len() {
bail!("Node ID {} out of bounds", id);
}
Ok(*from_bytes(
&nodes_lock.mmap[offset..offset + size_of::<NodeRec>()],
))
}
pub fn reserve_node_slot(&self) -> Result<u64> {
let mut manifest = self.manifest.lock();
let id = manifest.next_node_id;
manifest.next_node_id += 1;
Ok(id)
}
pub fn write_node_at(&self, record: &NodeRec) -> Result<()> {
self.node_writes.fetch_add(1, Ordering::Relaxed);
let node_size = size_of::<NodeRec>() as u64;
let required_len = (record.id + 1) * node_size;
{
let nodes_lock = self.nodes.read();
if required_len > nodes_lock.mmap.len().saturating_sub(SEGMENT_HEADER_LEN) as u64 {
drop(nodes_lock);
self.resize_nodes(required_len * 2)?;
}
}
let mut nodes_lock = self.nodes.write();
let offset = SEGMENT_HEADER_LEN + record.id as usize * size_of::<NodeRec>();
nodes_lock.mmap[offset..offset + size_of::<NodeRec>()].copy_from_slice(bytes_of(record));
let mut header: NodesHeader = *from_bytes(&nodes_lock.mmap[0..size_of::<NodesHeader>()]);
header.logical_len = header.logical_len.max(required_len);
nodes_lock.mmap[0..size_of::<NodesHeader>()].copy_from_slice(bytes_of(&header));
Ok(())
}
pub fn read_edge_raw(&self, index: u32) -> Result<EdgeRec> {
self.edge_reads.fetch_add(1, Ordering::Relaxed);
let edges_lock = self.edges.read();
let offset = SEGMENT_HEADER_LEN + index as usize * size_of::<EdgeRec>();
if offset + size_of::<EdgeRec>() > edges_lock.mmap.len() {
bail!("Edge index {} out of bounds", index);
}
Ok(*from_bytes(
&edges_lock.mmap[offset..offset + size_of::<EdgeRec>()],
))
}
pub fn write_edge_at(&self, index: u32, record: &EdgeRec) -> Result<()> {
self.edge_writes.fetch_add(1, Ordering::Relaxed);
let edge_size = size_of::<EdgeRec>();
let mut edges_lock = self.edges.write();
let offset = SEGMENT_HEADER_LEN + index as usize * edge_size;
if offset + edge_size > edges_lock.mmap.len() {
bail!("Edge index {} out of bounds", index);
}
edges_lock.mmap[offset..offset + edge_size].copy_from_slice(bytes_of(record));
Ok(())
}
pub fn append_edge_record(&self, record: &EdgeRec) -> Result<u32> {
let edge_size = size_of::<EdgeRec>() as u64;
let mut edges_lock = self.edges.write();
let mut header: EdgesHeader = *from_bytes(&edges_lock.mmap[0..size_of::<EdgesHeader>()]);
let index = (header.logical_len / edge_size) as u32;
let required_len = header.logical_len + edge_size;
if required_len > edges_lock.mmap.len().saturating_sub(SEGMENT_HEADER_LEN) as u64 {
drop(edges_lock);
self.resize_edges(required_len * 2)?;
edges_lock = self.edges.write();
}
let offset = SEGMENT_HEADER_LEN + index as usize * size_of::<EdgeRec>();
edges_lock.mmap[offset..offset + size_of::<EdgeRec>()].copy_from_slice(bytes_of(record));
header.logical_len = required_len;
edges_lock.mmap[0..size_of::<EdgesHeader>()].copy_from_slice(bytes_of(&header));
Ok(index)
}
pub fn register_edge_for_node(&self, node_id: u64, edge_index: u32) -> Result<()> {
let mut node = self.get_node_record(node_id)?;
if node.edge_len == 0 {
node.edge_off = edge_index;
}
node.edge_len += 1;
self.write_node_at(&node)
}
pub fn resize_nodes(&self, new_data_len: u64) -> Result<()> {
let mut nodes_lock = self.nodes.write();
let total_len = new_data_len + SEGMENT_HEADER_LEN as u64;
nodes_lock.file.set_len(total_len)?;
nodes_lock.mmap = unsafe { MmapMut::map_mut(&nodes_lock.file)? };
nodes_lock.current_len = total_len;
Ok(())
}
pub fn resize_edges(&self, new_data_len: u64) -> Result<()> {
let mut edges_lock = self.edges.write();
let total_len = new_data_len + SEGMENT_HEADER_LEN as u64;
edges_lock.file.set_len(total_len)?;
edges_lock.mmap = unsafe { MmapMut::map_mut(&edges_lock.file)? };
edges_lock.current_len = total_len;
Ok(())
}
pub fn resize_octree(&self, new_data_len: u64) -> Result<()> {
let mut octree_lock = self.octree.write();
let total_len = new_data_len + SEGMENT_HEADER_LEN as u64;
octree_lock.file.set_len(total_len)?;
octree_lock.mmap = unsafe { MmapMut::map_mut(&octree_lock.file)? };
octree_lock.current_len = total_len;
Ok(())
}
pub fn append_wal_entry(&self, entry: &WalEntry) -> Result<()> {
self.wal_engine.append(*entry)
}
pub fn read_wal_entries(&self) -> Result<Vec<WalEntry>> {
self.wal_engine.replay()
}
pub fn get_checkpoint_lsn(&self) -> u64 {
self.manifest.lock().checkpoint_lsn
}
pub fn get_current_wal_lsn(&self) -> u64 {
self.wal_engine.entry_count() }
pub fn sync_all(&self) -> Result<()> {
let mut errors = Vec::new();
if let Err(e) = self.nodes.write().flush_all() {
errors.push(format!("nodes: {}", e));
}
if let Err(e) = self.edges.write().flush_all() {
errors.push(format!("edges: {}", e));
}
if let Err(e) = self.octree.write().flush_all() {
errors.push(format!("octree: {}", e));
}
if let Err(e) = self.wal_engine.flush() {
errors.push(format!("wal: {}", e));
}
if !errors.is_empty() {
bail!("sync_all failed: {}", errors.join(", "));
}
Ok(())
}
pub fn checkpoint(&self) -> Result<u64> {
let current_wal_lsn = self.get_current_wal_lsn();
self.sync_all()
.context("failed to sync storage during checkpoint")?;
{
let mut manifest = self.manifest.lock();
manifest.checkpoint_lsn = current_wal_lsn;
manifest.set_clean(true);
}
self.persist_manifest(false)
.context("failed to persist manifest during checkpoint")?;
Ok(current_wal_lsn)
}
pub fn persist_manifest(&self, clean: bool) -> Result<()> {
let manifest = self.manifest.lock();
let mut temp_manifest = *manifest;
temp_manifest.set_clean(clean);
let bytes = bytes_of(&temp_manifest);
let temp_path = self.manifest_path.with_extension("tmp");
{
let mut file = File::create(&temp_path)
.map_err(|e| anyhow!("Failed to create temp manifest: {}", e))?;
file.write_all(bytes)
.map_err(|e| anyhow!("Failed to write temp manifest: {}", e))?;
file.sync_data()
.map_err(|e| anyhow!("Failed to sync temp manifest: {}", e))?;
}
std::fs::rename(&temp_path, &self.manifest_path)
.map_err(|e| anyhow!("Failed to rename manifest: {}", e))?;
Ok(())
}
pub fn nodes_logical_len(&self) -> u64 {
let nodes_lock = self.nodes.read();
let header: NodesHeader = *from_bytes(&nodes_lock.mmap[0..size_of::<NodesHeader>()]);
header.logical_len
}
pub fn tombstone_node(&self, id: u64) -> Result<()> {
let mut node = self.get_node_record(id)?;
node.flags |= NODE_FLAG_TOMBSTONE;
self.write_node_at(&node)
}
pub fn tombstone_edge(&self, src: u64, dst: u64) -> Result<()> {
let mut found = false;
for i in 0..self.edge_count() as u32 {
let edge = self.read_edge_raw(i)?;
if edge.src == src && edge.dst == dst && (edge.flags & EDGE_FLAG_TOMBSTONE) == 0 {
let mut new_edge = edge;
new_edge.flags |= EDGE_FLAG_TOMBSTONE;
self.write_edge_at(i, &new_edge)?;
found = true;
break;
}
}
if !found {
bail!("Edge {} -> {} not found", src, dst);
}
Ok(())
}
pub fn get_edges_for_node(&self, node: &NodeRec) -> Result<Vec<EdgeRec>> {
let mut edges = Vec::new();
for i in 0..node.edge_len {
let idx = node.edge_off + i;
let edge = self.read_edge_raw(idx)?;
edges.push(edge);
}
Ok(edges)
}
pub fn delete_edge(&self, src: u64, dst: u64) -> Result<()> {
self.tombstone_edge(src, dst)
}
pub fn compact_edges(&self) -> Result<CompactionStats> {
let mut new_edges = Vec::new();
let edge_count = self.edge_count();
for i in 0..edge_count as u32 {
let edge = self.read_edge_raw(i)?;
if (edge.flags & EDGE_FLAG_TOMBSTONE) == 0 {
new_edges.push(edge);
}
}
let removed = edge_count - new_edges.len();
let old_bytes = edge_count * size_of::<EdgeRec>();
let new_bytes = new_edges.len() * size_of::<EdgeRec>();
{
let mut edges_lock = self.edges.write();
let total_len = SEGMENT_HEADER_LEN as u64 + new_bytes as u64;
edges_lock.file.set_len(total_len)?;
edges_lock.mmap = unsafe { MmapMut::map_mut(&edges_lock.file)? };
let mut header: EdgesHeader =
*from_bytes(&edges_lock.mmap[0..size_of::<EdgesHeader>()]);
header.logical_len = new_bytes as u64;
edges_lock.mmap[0..size_of::<EdgesHeader>()].copy_from_slice(bytes_of(&header));
for (i, edge) in new_edges.iter().enumerate() {
let offset = SEGMENT_HEADER_LEN + i * size_of::<EdgeRec>();
edges_lock.mmap[offset..offset + size_of::<EdgeRec>()]
.copy_from_slice(bytes_of(edge));
}
}
Ok(CompactionStats {
edges_removed: removed as u32,
space_reclaimed: (old_bytes - new_bytes) as u64,
})
}
pub fn close(&self) -> Result<()> {
self.persist_manifest(true)
}
}
fn open_segment(path: &Path, initial_size: u64, kind: SegmentKind) -> Result<MappedFile> {
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(path)
.context(format!("Failed to open segment: {:?}", path))?;
let metadata = file.metadata()?;
if metadata.len() == 0 {
let header_size = size_of::<NodesHeader>();
let total_size = initial_size + header_size as u64;
file.set_len(total_size)?;
let header = match kind {
SegmentKind::Nodes => {
let h: NodesHeader = NodesHeader {
magic: 0x4E4F4445,
version: CURRENT_VERSION,
logical_len: 0,
next_id: 1,
reserved: [0u8; 32],
};
bytemuck::bytes_of(&h).to_vec()
}
SegmentKind::Edges => {
let h: EdgesHeader = EdgesHeader {
magic: 0x45444745,
version: CURRENT_VERSION,
logical_len: 0,
reserved: [0u8; 32],
};
bytemuck::bytes_of(&h).to_vec()
}
SegmentKind::Octree => {
let h: OctreeHeader = OctreeHeader {
magic: 0x4F435452,
version: CURRENT_VERSION,
logical_len: 0,
reserved: [0u8; 32],
};
bytemuck::bytes_of(&h).to_vec()
}
SegmentKind::Wal => {
let h: WalHeader = WalHeader {
magic: 0x57414C47,
version: CURRENT_VERSION,
logical_len: 0,
reserved: [0u8; 32],
};
bytemuck::bytes_of(&h).to_vec()
}
};
file.write_all(&header)?;
file.sync_data()?;
}
let mmap = unsafe { MmapMut::map_mut(&file)? };
let current_len = file.metadata()?.len();
Ok(MappedFile {
file,
mmap,
current_len,
})
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_create_and_open() {
let dir = tempdir().unwrap();
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096);
assert!(sm.is_ok());
let sm = sm.unwrap();
assert_eq!(sm.node_count(), 0);
assert_eq!(sm.edge_count(), 0);
}
#[test]
fn test_insert_node() {
let dir = tempdir().unwrap();
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let id = sm.reserve_node_slot().unwrap();
let node = NodeRec {
id,
morton_code: 0,
x: 1.0,
y: 2.0,
z: 3.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0u8; 7],
};
sm.write_node_at(&node).unwrap();
let read = sm.get_node_record(id).unwrap();
assert_eq!(read.id, id);
assert_eq!(read.x, 1.0);
}
#[test]
fn test_append_edge() {
let dir = tempdir().unwrap();
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let edge = EdgeRec {
src: 0,
dst: 1,
w: 1.5,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0u8; 7],
};
let idx = sm.append_edge_record(&edge).unwrap();
assert_eq!(idx, 0);
let read = sm.read_edge_raw(idx).unwrap();
assert_eq!(read.dst, 1);
}
#[test]
fn test_wal_integration() {
let dir = tempdir().unwrap();
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let entry = WalEntry {
timestamp: 1,
node_id: 42,
edge_dst: 0,
x: 0.0,
y: 0.0,
z: 0.0,
edge_w: 0.0,
entry_type: 1,
_padding: [0u8; 7],
tx_id: 0,
lsn: 1,
};
sm.append_wal_entry(&entry).unwrap();
sm.wal_engine.flush().unwrap();
let entries = sm.read_wal_entries().unwrap();
assert_eq!(entries.len(), 1);
assert_eq!(entries[0].node_id, 42);
}
#[test]
fn test_checkpoint_and_reopen() {
let dir = tempdir().unwrap();
{
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let id = sm.reserve_node_slot().unwrap();
let node = NodeRec {
id,
morton_code: 0,
x: 10.0,
y: 20.0,
z: 30.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0u8; 7],
};
sm.write_node_at(&node).unwrap();
let lsn = sm.checkpoint().unwrap();
assert_eq!(sm.get_current_wal_lsn(), lsn);
}
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let node = sm.get_node_record(1).unwrap();
assert_eq!(node.x, 10.0);
assert_eq!(node.y, 20.0);
}
#[test]
fn test_crash_recovery_wal_replay() {
let dir = tempdir().unwrap();
let entries_in = vec![
WalEntry {
timestamp: 1,
node_id: 10,
edge_dst: 0,
x: 1.0,
y: 2.0,
z: 3.0,
edge_w: 0.0,
entry_type: 1,
_padding: [0u8; 7],
tx_id: 1,
lsn: 1,
},
WalEntry {
timestamp: 2,
node_id: 20,
edge_dst: 0,
x: 4.0,
y: 5.0,
z: 6.0,
edge_w: 0.0,
entry_type: 1,
_padding: [0u8; 7],
tx_id: 2,
lsn: 2,
},
];
{
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
for e in &entries_in {
sm.append_wal_entry(e).unwrap();
}
sm.wal_engine.flush().unwrap();
}
let sm2 = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let entries_out = sm2.read_wal_entries().unwrap();
assert_eq!(entries_out.len(), 2);
assert_eq!(entries_out[0].node_id, 10);
assert_eq!(entries_out[1].node_id, 20);
}
#[test]
fn test_crash_recovery_node_persisted() {
let dir = tempdir().unwrap();
{
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let id = sm.reserve_node_slot().unwrap();
let node = NodeRec {
id,
morton_code: 0,
x: 7.0,
y: 8.0,
z: 9.0,
edge_off: 0,
edge_len: 0,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0u8; 7],
};
sm.write_node_at(&node).unwrap();
sm.sync_all().unwrap();
}
let sm2 = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let node = sm2.get_node_record(1).unwrap();
assert_eq!(node.x, 7.0);
assert_eq!(node.y, 8.0);
assert_eq!(node.z, 9.0);
}
#[test]
fn test_manifest_clean_shutdown() {
let dir = tempdir().unwrap();
{
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
sm.close().unwrap();
}
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let manifest = sm.manifest.lock();
assert_eq!(manifest.clean_shutdown, 1);
}
#[test]
fn test_sync_all_durability() {
let dir = tempdir().unwrap();
let sm = GraphStorageManager::open(dir.path(), 4096, 4096, 4096, 4096).unwrap();
let edge = EdgeRec {
src: 0,
dst: 1,
w: std::f32::consts::PI,
flags: 0,
begin_ts: 0,
end_ts: 0,
tx_id: 0,
visibility: 1,
_padding: [0u8; 7],
};
let idx = sm.append_edge_record(&edge).unwrap();
sm.sync_all().unwrap();
let read = sm.read_edge_raw(idx).unwrap();
assert_eq!(read.w, std::f32::consts::PI);
}
}