#[macro_use]
extern crate wascc_codec as codec;
extern crate wasccgraph_common as common;
#[macro_use]
extern crate log;
use codec::capabilities::{
CapabilityDescriptor, CapabilityProvider, Dispatcher, NullDispatcher, OperationDirection,
OP_GET_CAPABILITY_DESCRIPTOR,
};
use codec::core::{CapabilityConfiguration, OP_BIND_ACTOR, OP_REMOVE_ACTOR};
use codec::{deserialize, serialize};
use std::error::Error;
use std::{
collections::HashMap,
sync::{Arc, RwLock},
};
use common::protocol::*;
use redis::Connection;
use redis::RedisResult;
use redisgraph::{Graph, RedisGraphResult, ResultSet};
mod rgraph;
const SYSTEM_ACTOR: &str = "system";
const VERSION: &str = env!("CARGO_PKG_VERSION");
const REVISION: u32 = 2;
#[cfg(not(feature = "static_plugin"))]
capability_provider!(WasccRedisgraphProvider, WasccRedisgraphProvider::new);
pub struct WasccRedisgraphProvider {
dispatcher: RwLock<Box<dyn Dispatcher>>,
clients: Arc<RwLock<HashMap<String, redis::Client>>>,
}
impl Default for WasccRedisgraphProvider {
fn default() -> Self {
let _ = env_logger::builder().format_module_path(false).try_init();
WasccRedisgraphProvider {
dispatcher: RwLock::new(Box::new(NullDispatcher::new())),
clients: Arc::new(RwLock::new(HashMap::new())),
}
}
}
impl WasccRedisgraphProvider {
pub fn new() -> Self {
Self::default()
}
fn query_graph(&self, actor: &str, query: QueryRequest) -> Result<Vec<u8>, Box<dyn Error>> {
trace!("Querying graph database: {:?}", query);
let mut g = self.open_graph(actor, &query.graph_name)?;
let rs: RedisGraphResult<ResultSet> = g.query(&query.query);
match rs {
Ok(rs) => Ok(serialize(&to_common_resultset(rs)?)?),
Err(e) => Err(format!("Graph query failure: {:?}", e).into()),
}
}
fn delete_graph(&self, actor: &str, delete: DeleteRequest) -> Result<Vec<u8>, Box<dyn Error>> {
let g = self.open_graph(actor, &delete.graph_name)?; let rs: RedisGraphResult<()> = g.delete();
match rs {
Ok(_) => Ok(vec![]),
Err(e) => Err(format!("Failed to delete graph: {:?}", e).into()),
}
}
fn deconfigure(&self, actor: &str) -> Result<Vec<u8>, Box<dyn Error>> {
if self.clients.write().unwrap().remove(actor).is_none() {
warn!("Attempted to de-configure non-existent actor: {}", actor);
}
Ok(vec![])
}
fn configure(&self, config: CapabilityConfiguration) -> Result<Vec<u8>, Box<dyn Error>> {
trace!("Configuring provider for {}", &config.module);
let c = rgraph::initialize_client(config.clone())?;
self.clients.write().unwrap().insert(config.module, c);
Ok(vec![])
}
fn actor_con(&self, actor: &str) -> RedisResult<Connection> {
let lock = self.clients.read().unwrap();
if let Some(client) = lock.get(actor) {
client.get_connection()
} else {
Err(redis::RedisError::from((
redis::ErrorKind::InvalidClientConfig,
"No client for this actor. Did the host configure it?",
)))
}
}
fn open_graph(&self, actor: &str, graph: &str) -> Result<Graph, Box<dyn Error>> {
let conn = self.actor_con(actor)?;
let g = rgraph::open_graph(conn, &graph)?;
Ok(g)
}
fn get_descriptor(&self) -> Result<Vec<u8>, Box<dyn Error>> {
Ok(serialize(
CapabilityDescriptor::builder()
.id(common::CAPID_GRAPHDB)
.name("waSCC Graph Database Provider (RedisGraph)")
.long_description("A capability provider exposing Cypher-based RedisGraph database access to waSCC actors")
.version(VERSION)
.revision(REVISION)
.with_operation(
OP_QUERY,
OperationDirection::ToProvider,
"Executes a Cypher query against the database and returns the results"
)
.with_operation(
OP_DELETE,
OperationDirection::ToProvider,
"Deletes a graph database"
)
.build()
)?)
}
}
fn to_common_resultset(rs: redisgraph::ResultSet) -> Result<common::ResultSet, Box<dyn Error>> {
let input = serialize(&rs)?;
let output: common::ResultSet = deserialize(&input)?;
Ok(output)
}
impl CapabilityProvider for WasccRedisgraphProvider {
fn configure_dispatch(&self, dispatcher: Box<dyn Dispatcher>) -> Result<(), Box<dyn Error>> {
trace!("Dispatcher received.");
let mut lock = self.dispatcher.write().unwrap();
*lock = dispatcher;
Ok(())
}
fn handle_call(&self, actor: &str, op: &str, msg: &[u8]) -> Result<Vec<u8>, Box<dyn Error>> {
trace!("Received host call from {}, operation - {}", actor, op);
match op {
OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
OP_QUERY => self.query_graph(actor, deserialize(msg)?),
OP_DELETE => self.delete_graph(actor, deserialize(msg)?),
OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(actor),
OP_GET_CAPABILITY_DESCRIPTOR if actor == SYSTEM_ACTOR => self.get_descriptor(),
_ => Err("bad dispatch".into()),
}
}
}