use super::GraphFolder;
#[cfg(feature = "search")]
use crate::prelude::IndexMutationOps;
use crate::{
db::{
api::{storage::storage::Storage, view::MaterializedGraph},
graph::views::deletion_graph::PersistentGraph,
},
errors::{GraphError, WriteError},
prelude::{AdditionOps, Graph, StableDecode},
serialise::{
serialise::{CacheOps, InternalStableDecode, StableEncode},
ProtoGraph,
},
};
use parking_lot::Mutex;
use prost::Message;
use raphtory_api::core::{
entities::{
properties::prop::{Prop, PropType},
GidRef, EID, VID,
},
storage::{dict_mapper::MaybeNew, timeindex::EventTime},
};
use std::{
fmt::Debug,
io::{Seek, SeekFrom, Write},
mem,
ops::DerefMut,
sync::Arc,
};
use tracing::instrument;
#[derive(Debug)]
pub struct GraphWriter {
write_lock: Arc<Mutex<()>>,
proto_delta: Mutex<ProtoGraph>,
pub(crate) folder: GraphFolder,
}
fn try_write(folder: &GraphFolder, bytes: &[u8]) -> Result<(), WriteError> {
let mut writer = folder.get_appendable_graph_file()?;
let pos = writer.seek(SeekFrom::End(0))?;
writer
.write_all(bytes)
.map_err(|write_err| match writer.set_len(pos) {
Ok(_) => WriteError::WriteError(write_err),
Err(reset_err) => WriteError::FatalWriteError(write_err, reset_err),
})
}
impl GraphWriter {
pub fn new(folder: GraphFolder) -> Result<Self, GraphError> {
Ok(Self {
write_lock: Arc::new(Mutex::new(())),
proto_delta: Default::default(),
folder,
})
}
pub fn fork(&self) -> Self {
GraphWriter {
write_lock: self.write_lock.clone(),
proto_delta: Default::default(),
folder: self.folder.clone(),
}
}
pub fn write(&self) -> Result<(), GraphError> {
let mut proto = mem::take(self.proto_delta.lock().deref_mut());
let bytes = proto.encode_to_vec();
if !bytes.is_empty() {
let _guard = self.write_lock.lock();
if let Err(write_err) = try_write(&self.folder, &bytes) {
let mut new_delta = self.proto_delta.lock();
let bytes = new_delta.encode_to_vec();
match proto.merge(&*bytes) {
Ok(_) => *new_delta = proto,
Err(decode_err) => {
return Err(GraphError::FatalDecodeError {
write_err,
decode_err,
});
}
}
return Err(write_err.into());
}
}
Ok(())
}
#[inline]
pub fn resolve_layer(&self, layer: Option<&str>, layer_id: MaybeNew<usize>) {
layer_id.if_new(|id| {
let layer = layer.unwrap_or("_default");
self.proto_delta.lock().new_layer(layer, id)
});
}
pub fn resolve_node(&self, vid: MaybeNew<VID>, gid: GidRef) {
vid.if_new(|vid| self.proto_delta.lock().new_node(gid, vid, 0));
}
pub fn resolve_node_and_type(
&self,
node_and_type: MaybeNew<(MaybeNew<VID>, MaybeNew<usize>)>,
node_type: &str,
gid: GidRef,
) {
if let MaybeNew::New((MaybeNew::Existing(node_id), type_id)) = node_and_type {
self.proto_delta
.lock()
.update_node_type(node_id, type_id.inner());
}
if let (MaybeNew::New(node_id), type_id) = node_and_type.inner() {
self.proto_delta
.lock()
.new_node(gid, node_id, type_id.inner());
}
if let (_, MaybeNew::New(type_id)) = node_and_type.inner() {
self.proto_delta.lock().new_node_type(node_type, type_id);
}
}
pub fn resolve_graph_property(
&self,
prop: &str,
prop_id: MaybeNew<usize>,
dtype: PropType,
is_static: bool,
) {
prop_id.if_new(|id| {
if is_static {
self.proto_delta.lock().new_graph_cprop(prop, id);
} else {
self.proto_delta.lock().new_graph_tprop(prop, id, &dtype);
}
});
}
pub fn resolve_node_property(
&self,
prop: &str,
prop_id: MaybeNew<usize>,
dtype: &PropType,
is_static: bool,
) {
prop_id.if_new(|id| {
if is_static {
self.proto_delta.lock().new_node_cprop(prop, id, dtype);
} else {
self.proto_delta.lock().new_node_tprop(prop, id, dtype);
}
});
}
pub fn resolve_edge_property(
&self,
prop: &str,
prop_id: MaybeNew<usize>,
dtype: &PropType,
is_static: bool,
) {
prop_id.if_new(|id| {
if is_static {
self.proto_delta.lock().new_edge_cprop(prop, id, dtype);
} else {
self.proto_delta.lock().new_edge_tprop(prop, id, dtype);
}
});
}
pub fn add_node_update(&self, t: EventTime, v: VID, props: &[(usize, Prop)]) {
self.proto_delta
.lock()
.update_node_tprops(v, t, props.iter().map(|(id, prop)| (*id, prop)))
}
pub fn resolve_edge(&self, eid: MaybeNew<EID>, src: VID, dst: VID) {
eid.if_new(|eid| self.proto_delta.lock().new_edge(src, dst, eid));
}
pub fn add_edge_update(&self, t: EventTime, edge: EID, props: &[(usize, Prop)], layer: usize) {
self.proto_delta.lock().update_edge_tprops(
edge,
t,
layer,
props.iter().map(|(id, prop)| (*id, prop)),
)
}
pub fn add_graph_tprops(&self, t: EventTime, props: &[(usize, Prop)]) {
self.proto_delta
.lock()
.update_graph_tprops(t, props.iter().map(|(id, prop)| (*id, prop)))
}
pub fn add_graph_cprops(&self, props: &[(usize, Prop)]) {
self.proto_delta
.lock()
.update_graph_cprops(props.iter().map(|(id, prop)| (*id, prop)))
}
pub fn add_node_cprops(&self, node: VID, props: &[(usize, Prop)]) {
self.proto_delta
.lock()
.update_node_cprops(node, props.iter().map(|(id, prop)| (*id, prop)))
}
pub fn add_edge_cprops(&self, edge: EID, layer: usize, props: &[(usize, Prop)]) {
if !props.is_empty() {
self.proto_delta.lock().update_edge_cprops(
edge,
layer,
props.iter().map(|(id, prop)| (*id, prop)),
)
}
}
pub fn delete_edge(&self, edge: EID, t: EventTime, layer: usize) {
self.proto_delta.lock().del_edge(edge, layer, t)
}
}
pub trait InternalCache {
fn init_cache(&self, path: &GraphFolder) -> Result<(), GraphError>;
fn get_cache(&self) -> Option<&GraphWriter>;
}
impl InternalCache for Storage {
fn init_cache(&self, path: &GraphFolder) -> Result<(), GraphError> {
self.cache
.get_or_try_init(|| GraphWriter::new(path.clone()))?;
Ok(())
}
fn get_cache(&self) -> Option<&GraphWriter> {
self.cache.get()
}
}
impl InternalCache for Graph {
fn init_cache(&self, path: &GraphFolder) -> Result<(), GraphError> {
self.inner.init_cache(path)
}
fn get_cache(&self) -> Option<&GraphWriter> {
self.inner.get_cache()
}
}
impl InternalCache for PersistentGraph {
fn init_cache(&self, path: &GraphFolder) -> Result<(), GraphError> {
self.0.init_cache(path)
}
fn get_cache(&self) -> Option<&GraphWriter> {
self.0.get_cache()
}
}
impl InternalCache for MaterializedGraph {
fn init_cache(&self, path: &GraphFolder) -> Result<(), GraphError> {
match self {
MaterializedGraph::EventGraph(g) => g.init_cache(path),
MaterializedGraph::PersistentGraph(g) => g.init_cache(path),
}
}
fn get_cache(&self) -> Option<&GraphWriter> {
match self {
MaterializedGraph::EventGraph(g) => g.get_cache(),
MaterializedGraph::PersistentGraph(g) => g.get_cache(),
}
}
}
impl<G: InternalCache + InternalStableDecode + StableEncode + AdditionOps> CacheOps for G {
fn cache(&self, path: impl Into<GraphFolder>) -> Result<(), GraphError> {
let folder = path.into();
self.encode(&folder)?;
self.init_cache(&folder)
}
#[instrument(level = "debug", skip(self))]
fn write_updates(&self) -> Result<(), GraphError> {
let cache = self.get_cache().ok_or(GraphError::CacheNotInnitialised)?;
cache.write()?;
cache.folder.write_metadata(self)?;
#[cfg(feature = "search")]
self.persist_index_to_disk(&cache.folder)?;
Ok(())
}
fn load_cached(path: impl Into<GraphFolder>) -> Result<Self, GraphError> {
let folder = path.into();
if folder.is_zip() {
return Err(GraphError::ZippedGraphCannotBeCached);
}
let graph = Self::decode(&folder)?;
graph.init_cache(&folder)?;
Ok(graph)
}
}
#[cfg(test)]
mod test {
use crate::serialise::{incremental::GraphWriter, GraphFolder};
use raphtory_api::core::{
entities::{GidRef, VID},
storage::dict_mapper::MaybeNew,
utils::logging::global_info_logger,
};
use std::fs::File;
use tempfile::TempDir;
#[test]
fn test_write_failure() {
global_info_logger();
let tmp_dir = TempDir::new().unwrap();
let folder = GraphFolder::from(tmp_dir.path());
let graph_file_path = folder.get_graph_path();
let file = File::create(&graph_file_path).unwrap();
let mut perms = file.metadata().unwrap().permissions();
perms.set_readonly(true);
file.set_permissions(perms).unwrap();
let cache = GraphWriter::new(folder).unwrap();
cache.resolve_node(MaybeNew::New(VID(0)), GidRef::Str("0"));
assert_eq!(cache.proto_delta.lock().nodes.len(), 1);
let res = cache.write();
assert!(res.is_err());
assert_eq!(cache.proto_delta.lock().nodes.len(), 1);
}
}