use crate::{
client::{FalkorClientProvider, ProvidesSyncConnections},
connection::blocking::{BorrowedSyncConnection, FalkorSyncConnection},
parser::{parse_config_hashmap, redis_value_as_untyped_string_vec},
ConfigValue, FalkorConnectionInfo, FalkorDBError, FalkorResult, SyncGraph,
};
use parking_lot::Mutex;
use std::{
collections::HashMap,
sync::{mpsc, Arc},
};
pub(crate) struct FalkorSyncClientInner {
_inner: Mutex<FalkorClientProvider>,
connection_pool_size: u8,
connection_pool_tx: mpsc::SyncSender<FalkorSyncConnection>,
connection_pool_rx: Mutex<mpsc::Receiver<FalkorSyncConnection>>,
}
impl FalkorSyncClientInner {
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "Borrow Connection From Connection Pool",
skip_all,
level = "debug"
)
)]
pub(crate) fn borrow_connection(
&self,
pool_owner: Arc<Self>,
) -> FalkorResult<BorrowedSyncConnection> {
Ok(BorrowedSyncConnection::new(
self.connection_pool_rx
.lock()
.recv()
.map_err(|_| FalkorDBError::EmptyConnection)?,
self.connection_pool_tx.clone(),
pool_owner,
))
}
}
impl ProvidesSyncConnections for FalkorSyncClientInner {
#[cfg_attr(
feature = "tracing",
tracing::instrument(
name = "Get New Sync Connection From Client",
skip_all,
level = "info"
)
)]
fn get_connection(&self) -> FalkorResult<FalkorSyncConnection> {
self._inner.lock().get_connection()
}
}
#[derive(Clone)]
pub struct FalkorSyncClient {
inner: Arc<FalkorSyncClientInner>,
_connection_info: FalkorConnectionInfo,
}
impl FalkorSyncClient {
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Create Sync Client", skip_all, level = "info")
)]
pub(crate) fn create(
mut client: FalkorClientProvider,
connection_info: FalkorConnectionInfo,
num_connections: u8,
) -> FalkorResult<Self> {
let (connection_pool_tx, connection_pool_rx) = mpsc::sync_channel(num_connections as usize);
for _ in 0..num_connections {
let new_conn = client
.get_connection()
.map_err(|err| FalkorDBError::RedisError(err.to_string()))?;
connection_pool_tx
.send(new_conn)
.map_err(|_| FalkorDBError::EmptyConnection)?;
}
Ok(Self {
inner: Arc::new(FalkorSyncClientInner {
_inner: client.into(),
connection_pool_size: num_connections,
connection_pool_tx,
connection_pool_rx: Mutex::new(connection_pool_rx),
}),
_connection_info: connection_info,
})
}
pub fn connection_pool_size(&self) -> u8 {
self.inner.connection_pool_size
}
pub(crate) fn borrow_connection(&self) -> FalkorResult<BorrowedSyncConnection> {
self.inner.borrow_connection(self.inner.clone())
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "List Graphs", skip_all, level = "info")
)]
pub fn list_graphs(&self) -> FalkorResult<Vec<String>> {
let mut conn = self.borrow_connection()?;
conn.execute_command(None, "GRAPH.LIST", None, None)
.and_then(redis_value_as_untyped_string_vec)
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Get Config Value", skip_all, level = "info")
)]
pub fn config_get(
&self,
config_key: &str,
) -> FalkorResult<HashMap<String, ConfigValue>> {
self.borrow_connection()
.and_then(|mut conn| {
conn.execute_command(None, "GRAPH.CONFIG", Some("GET"), Some(&[config_key]))
})
.and_then(parse_config_hashmap)
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Set Config Value", skip_all, level = "info")
)]
pub fn config_set<C: Into<ConfigValue>>(
&self,
config_key: &str,
value: C,
) -> FalkorResult<redis::Value> {
self.borrow_connection().and_then(|mut conn| {
conn.execute_command(
None,
"GRAPH.CONFIG",
Some("SET"),
Some(&[config_key, value.into().to_string().as_str()]),
)
})
}
pub fn select_graph<T: ToString>(
&self,
graph_name: T,
) -> SyncGraph {
SyncGraph::new(self.inner.clone(), graph_name)
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Copy Graph", skip_all, level = "info")
)]
pub fn copy_graph(
&self,
graph_to_clone: &str,
new_graph_name: &str,
) -> FalkorResult<SyncGraph> {
self.borrow_connection()?.execute_command(
Some(graph_to_clone),
"GRAPH.COPY",
None,
Some(&[new_graph_name]),
)?;
Ok(self.select_graph(new_graph_name))
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(name = "Client Get Redis Info", skip_all, level = "info")
)]
pub fn redis_info(
&self,
section: Option<&str>,
) -> FalkorResult<HashMap<String, String>> {
self.borrow_connection()?
.as_inner()?
.get_redis_info(section)
}
}
#[cfg(test)]
pub(crate) fn create_empty_inner_sync_client() -> Arc<FalkorSyncClientInner> {
let (tx, rx) = mpsc::sync_channel(1);
tx.send(FalkorSyncConnection::None).ok();
Arc::new(FalkorSyncClientInner {
_inner: Mutex::new(FalkorClientProvider::None),
connection_pool_size: 0,
connection_pool_tx: tx,
connection_pool_rx: Mutex::new(rx),
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
test_utils::{create_test_client, TestSyncGraphHandle},
FalkorClientBuilder,
};
use std::{mem, num::NonZeroU8, sync::mpsc::TryRecvError, thread};
#[test]
fn test_borrow_connection() {
let client = FalkorClientBuilder::new()
.with_num_connections(NonZeroU8::new(6).expect("Could not create a perfectly valid u8"))
.build()
.expect("Could not create client for this test");
let _conn_vec: Vec<FalkorResult<BorrowedSyncConnection>> = (0..6)
.map(|_| {
let conn = client.borrow_connection();
assert!(conn.is_ok());
conn
})
.collect();
let non_existing_conn = client.inner.connection_pool_rx.lock().try_recv();
assert!(non_existing_conn.is_err());
let Err(TryRecvError::Empty) = non_existing_conn else {
panic!("Got error, but not a TryRecvError::Empty, as expected");
};
}
#[test]
fn test_list_graphs() {
let client = create_test_client();
let res = client.list_graphs();
assert!(res.is_ok());
let graphs = res.unwrap();
assert!(graphs.contains(&"imdb".to_string()));
}
#[test]
fn test_select_graph_and_query() {
let client = create_test_client();
let mut graph = client.select_graph("imdb");
assert_eq!(graph.graph_name(), "imdb".to_string());
let res = graph
.query("MATCH (a:actor) return a")
.execute()
.expect("Could not get actors from unmodified graph");
assert_eq!(res.data.collect::<Vec<_>>().len(), 1317);
}
#[test]
fn test_copy_graph() {
let client = create_test_client();
client.select_graph("imdb_ro_copy").delete().ok();
let graph = client.copy_graph("imdb", "imdb_ro_copy");
assert!(graph.is_ok());
let mut graph = TestSyncGraphHandle {
inner: graph.unwrap(),
};
let mut original_graph = client.select_graph("imdb");
assert_eq!(
graph
.inner
.query("MATCH (a:actor) RETURN a")
.execute()
.expect("Could not get actors from unmodified graph")
.data
.collect::<Vec<_>>(),
original_graph
.query("MATCH (a:actor) RETURN a")
.execute()
.expect("Could not get actors from unmodified graph")
.data
.collect::<Vec<_>>()
)
}
#[test]
fn test_get_config() {
let client = create_test_client();
let config = client
.config_get("QUERY_MEM_CAPACITY")
.expect("Could not get configuration");
assert_eq!(config.len(), 1);
assert!(config.contains_key("QUERY_MEM_CAPACITY"));
assert_eq!(
mem::discriminant(config.get("QUERY_MEM_CAPACITY").unwrap()),
mem::discriminant(&ConfigValue::Int64(0))
);
}
#[test]
fn test_get_config_all() {
let client = create_test_client();
let configuration = client.config_get("*").expect("Could not get configuration");
assert_eq!(
configuration.get("THREAD_COUNT").cloned().unwrap(),
ConfigValue::Int64(thread::available_parallelism().unwrap().get() as i64)
);
}
#[test]
fn test_set_config() {
let client = create_test_client();
let config = client
.config_get("DELTA_MAX_PENDING_CHANGES")
.expect("Could not get configuration");
let current_val = config
.get("DELTA_MAX_PENDING_CHANGES")
.cloned()
.unwrap()
.as_i64()
.unwrap();
let desired_val = if current_val == 10000 { 50000 } else { 10000 };
client
.config_set("DELTA_MAX_PENDING_CHANGES", desired_val)
.expect("Could not set config value");
let new_config = client
.config_get("DELTA_MAX_PENDING_CHANGES")
.expect("Could not get configuration");
assert_eq!(
new_config
.get("DELTA_MAX_PENDING_CHANGES")
.cloned()
.unwrap()
.as_i64()
.unwrap(),
desired_val
);
client
.config_set("DELTA_MAX_PENDING_CHANGES", current_val)
.ok();
}
}