use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::sync::{Arc, OnceLock};
use tokio::sync::RwLock;
static GLOBAL_SHARED_STORAGE: OnceLock<Arc<RwLock<RdfApp>>> = OnceLock::new();
pub fn init_global_shared_storage() -> Arc<RwLock<RdfApp>> {
GLOBAL_SHARED_STORAGE
.get_or_init(|| Arc::new(RwLock::new(RdfApp::default())))
.clone()
}
pub fn get_global_shared_storage() -> Option<Arc<RwLock<RdfApp>>> {
GLOBAL_SHARED_STORAGE.get().cloned()
}
pub async fn reset_global_shared_storage() {
if let Some(storage) = GLOBAL_SHARED_STORAGE.get() {
let mut state = storage.write().await;
*state = RdfApp::default();
}
}
#[cfg(feature = "raft")]
use openraft::{
BasicNode, Entry, EntryPayload, LogId, Membership, Raft, RaftMetrics, SnapshotMeta,
StorageError,
};
pub type OxirsNodeId = u64;
pub type OxirsRequestId = u64;
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum RdfCommand {
Insert {
subject: String,
predicate: String,
object: String,
},
Delete {
subject: String,
predicate: String,
object: String,
},
Clear,
BeginTransaction { tx_id: String },
CommitTransaction { tx_id: String },
RollbackTransaction { tx_id: String },
AddNode {
node_id: OxirsNodeId,
address: String,
},
RemoveNode { node_id: OxirsNodeId },
TransferLeadership { target_node: OxirsNodeId },
ForceEvictNode { node_id: OxirsNodeId },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum RdfResponse {
Success,
Error(String),
TransactionStarted { tx_id: String },
TransactionCommitted { tx_id: String },
TransactionRolledBack { tx_id: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
pub struct RdfApp {
pub triples: BTreeSet<(String, String, String)>,
pub transactions: BTreeMap<String, BTreeSet<(String, String, String)>>,
pub shards: BTreeMap<crate::shard::ShardId, BTreeSet<(String, String, String)>>,
pub deleted_shards: BTreeSet<crate::shard::ShardId>,
}
impl RdfApp {
pub fn apply_command(&mut self, cmd: &RdfCommand) -> RdfResponse {
match cmd {
RdfCommand::Insert {
subject,
predicate,
object,
} => {
self.triples
.insert((subject.clone(), predicate.clone(), object.clone()));
RdfResponse::Success
}
RdfCommand::Delete {
subject,
predicate,
object,
} => {
self.triples
.remove(&(subject.clone(), predicate.clone(), object.clone()));
RdfResponse::Success
}
RdfCommand::Clear => {
self.triples.clear();
RdfResponse::Success
}
RdfCommand::BeginTransaction { tx_id } => {
self.transactions.insert(tx_id.clone(), BTreeSet::new());
RdfResponse::TransactionStarted {
tx_id: tx_id.clone(),
}
}
RdfCommand::CommitTransaction { tx_id } => {
if let Some(tx_triples) = self.transactions.remove(tx_id) {
self.triples.extend(tx_triples);
RdfResponse::TransactionCommitted {
tx_id: tx_id.clone(),
}
} else {
RdfResponse::Error(format!("Transaction {tx_id} not found"))
}
}
RdfCommand::RollbackTransaction { tx_id } => {
if self.transactions.remove(tx_id).is_some() {
RdfResponse::TransactionRolledBack {
tx_id: tx_id.clone(),
}
} else {
RdfResponse::Error(format!("Transaction {tx_id} not found"))
}
}
RdfCommand::AddNode { node_id, address } => {
tracing::info!("Adding node {} at address {} to cluster", node_id, address);
RdfResponse::Success
}
RdfCommand::RemoveNode { node_id } => {
tracing::info!("Removing node {} from cluster", node_id);
RdfResponse::Success
}
RdfCommand::TransferLeadership { target_node } => {
tracing::info!("Transferring leadership to node {}", target_node);
RdfResponse::Success
}
RdfCommand::ForceEvictNode { node_id } => {
tracing::warn!("Force evicting node {} from cluster", node_id);
RdfResponse::Success
}
}
}
pub fn len(&self) -> usize {
self.triples.len()
}
pub fn is_empty(&self) -> bool {
self.triples.is_empty()
}
pub fn query(
&self,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Vec<(String, String, String)> {
self.triples
.iter()
.filter(|(s, p, o)| {
subject.map_or(true, |subj| s == subj)
&& predicate.map_or(true, |pred| p == pred)
&& object.map_or(true, |obj| o == obj)
})
.cloned()
.collect()
}
pub fn create_shard(&mut self, shard_id: crate::shard::ShardId) {
self.shards.insert(shard_id, BTreeSet::new());
}
pub fn delete_shard(&mut self, shard_id: crate::shard::ShardId) {
self.shards.remove(&shard_id);
self.deleted_shards.remove(&shard_id);
}
pub fn insert_triple_to_shard(
&mut self,
shard_id: crate::shard::ShardId,
triple: oxirs_core::model::Triple,
) {
let triple_tuple = (
triple.subject().to_string(),
triple.predicate().to_string(),
triple.object().to_string(),
);
let shard = self.shards.entry(shard_id).or_default();
shard.insert(triple_tuple);
}
pub fn delete_triple_from_shard(
&mut self,
shard_id: crate::shard::ShardId,
triple: &oxirs_core::model::Triple,
) {
let triple_tuple = (
triple.subject().to_string(),
triple.predicate().to_string(),
triple.object().to_string(),
);
if let Some(shard) = self.shards.get_mut(&shard_id) {
shard.remove(&triple_tuple);
}
}
pub fn query_shard(
&self,
shard_id: crate::shard::ShardId,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Vec<oxirs_core::model::Triple> {
if let Some(shard) = self.shards.get(&shard_id) {
shard
.iter()
.filter(|(s, p, o)| {
subject.map_or(true, |subj| s == subj)
&& predicate.map_or(true, |pred| p == pred)
&& object.map_or(true, |obj| o == obj)
})
.filter_map(|(s, p, o)| {
use oxirs_core::model::{Literal, NamedNode, Triple};
if let (Ok(subj), Ok(pred)) = (NamedNode::new(s), NamedNode::new(p)) {
if let Ok(obj_node) = NamedNode::new(o) {
Some(Triple::new(subj, pred, obj_node))
} else {
Some(Triple::new(subj, pred, Literal::new_simple_literal(o)))
}
} else {
None
}
})
.collect()
} else {
Vec::new()
}
}
pub fn get_shard_size(&self, shard_id: crate::shard::ShardId) -> u64 {
if let Some(shard) = self.shards.get(&shard_id) {
(shard.len() * 100) as u64
} else {
0
}
}
pub fn get_shard_triple_count(&self, shard_id: crate::shard::ShardId) -> usize {
self.shards.get(&shard_id).map_or(0, |shard| shard.len())
}
pub fn export_shard(&self, shard_id: crate::shard::ShardId) -> Vec<oxirs_core::model::Triple> {
self.query_shard(shard_id, None, None, None)
}
pub fn import_shard(
&mut self,
shard_id: crate::shard::ShardId,
triples: Vec<oxirs_core::model::Triple>,
) {
let shard = self.shards.entry(shard_id).or_default();
for triple in triples {
let triple_tuple = (
triple.subject().to_string(),
triple.predicate().to_string(),
triple.object().to_string(),
);
shard.insert(triple_tuple);
}
}
pub fn get_shard_triples(
&self,
shard_id: crate::shard::ShardId,
) -> Vec<oxirs_core::model::Triple> {
self.export_shard(shard_id)
}
pub fn insert_triples_to_shard(
&mut self,
shard_id: crate::shard::ShardId,
triples: Vec<oxirs_core::model::Triple>,
) {
let shard = self.shards.entry(shard_id).or_default();
for triple in triples {
let triple_tuple = (
triple.subject().to_string(),
triple.predicate().to_string(),
triple.object().to_string(),
);
shard.insert(triple_tuple);
}
}
pub fn mark_shard_for_deletion(&mut self, shard_id: crate::shard::ShardId) {
self.deleted_shards.insert(shard_id);
}
}
#[cfg(feature = "raft")]
mod raft_impl {
use super::*;
use openraft::{
storage::{LogState, Snapshot},
ErrorSubject, ErrorVerb, RaftLogReader, RaftSnapshotBuilder, RaftStorage, StorageIOError,
};
use std::io::Cursor;
#[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)]
pub struct OxirsTypeConfig;
impl openraft::RaftTypeConfig for OxirsTypeConfig {
type D = RdfCommand;
type R = RdfResponse;
type NodeId = OxirsNodeId;
type Node = BasicNode;
type Entry = Entry<Self>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = openraft::TokioRuntime;
type Responder = openraft::impls::OneshotResponder<Self>;
}
pub struct OxirsStorage {
pub state: Arc<RwLock<RdfApp>>,
pub log: Arc<RwLock<Vec<Entry<OxirsTypeConfig>>>>,
pub hard_state: Arc<RwLock<(u64, Option<OxirsNodeId>, Option<LogId<OxirsNodeId>>)>>,
pub last_applied: Arc<RwLock<Option<LogId<OxirsNodeId>>>>,
pub snapshot: Arc<RwLock<Option<Snapshot<OxirsTypeConfig>>>>,
}
impl OxirsStorage {
pub fn new() -> Self {
Self {
state: Arc::new(RwLock::new(RdfApp::default())),
log: Arc::new(RwLock::new(Vec::new())),
hard_state: Arc::new(RwLock::new((0, None, None))),
last_applied: Arc::new(RwLock::new(None)),
snapshot: Arc::new(RwLock::new(None)),
}
}
}
impl RaftLogReader<OxirsTypeConfig> for OxirsStorage {
async fn try_get_log_entries<
RB: std::ops::RangeBounds<u64> + Clone + std::fmt::Debug + Send,
>(
&mut self,
range: RB,
) -> Result<Vec<Entry<OxirsTypeConfig>>, StorageError<OxirsNodeId>> {
let log = self.log.read().await;
let start = match range.start_bound() {
std::ops::Bound::Included(&n) => n,
std::ops::Bound::Excluded(&n) => n + 1,
std::ops::Bound::Unbounded => 0,
};
let end = match range.end_bound() {
std::ops::Bound::Included(&n) => n + 1,
std::ops::Bound::Excluded(&n) => n,
std::ops::Bound::Unbounded => u64::MAX,
};
let entries = log
.iter()
.filter(|entry| entry.log_id.index >= start && entry.log_id.index < end)
.cloned()
.collect();
Ok(entries)
}
}
impl RaftSnapshotBuilder<OxirsTypeConfig> for OxirsStorage {
async fn build_snapshot(
&mut self,
) -> Result<Snapshot<OxirsTypeConfig>, StorageError<OxirsNodeId>> {
let state = self.state.read().await;
let last_applied = *self.last_applied.read().await;
let data = serde_json::to_vec(&*state).map_err(|e| StorageError::IO {
source: StorageIOError::new(
ErrorSubject::StateMachine,
ErrorVerb::Write,
openraft::AnyError::new(&e),
),
})?;
let snapshot = Snapshot {
meta: SnapshotMeta {
last_log_id: last_applied,
last_membership: openraft::StoredMembership::new(
None,
Membership::new(vec![BTreeSet::new()], None),
),
snapshot_id: format!("snapshot-{}", last_applied.map_or(0, |id| id.index)),
},
snapshot: Box::new(Cursor::new(data)),
};
*self.snapshot.write().await = Some(snapshot.clone());
Ok(snapshot)
}
}
impl RaftStorage<OxirsTypeConfig> for OxirsStorage {
type LogReader = Self;
type SnapshotBuilder = Self;
async fn save_committed(
&mut self,
committed: Option<LogId<OxirsNodeId>>,
) -> Result<(), StorageError<OxirsNodeId>> {
self.hard_state.write().await.2 = committed;
Ok(())
}
async fn read_committed(
&mut self,
) -> Result<Option<LogId<OxirsNodeId>>, StorageError<OxirsNodeId>> {
Ok(self.hard_state.read().await.2)
}
async fn save_vote(
&mut self,
vote: &openraft::Vote<OxirsNodeId>,
) -> Result<(), StorageError<OxirsNodeId>> {
let mut hard_state = self.hard_state.write().await;
hard_state.0 = vote.leader_id.term;
hard_state.1 = vote.leader_id.voted_for();
Ok(())
}
async fn read_vote(
&mut self,
) -> Result<Option<openraft::Vote<OxirsNodeId>>, StorageError<OxirsNodeId>> {
let hard_state = self.hard_state.read().await;
if let Some(node_id) = hard_state.1 {
Ok(Some(openraft::Vote::new(hard_state.0, node_id)))
} else {
Ok(None)
}
}
async fn get_log_reader(&mut self) -> Self::LogReader {
self.clone()
}
async fn get_log_state(
&mut self,
) -> Result<LogState<OxirsTypeConfig>, StorageError<OxirsNodeId>> {
let log = self.log.read().await;
let last_log_id = log.last().map(|entry| entry.log_id);
let last_purged_log_id = None;
Ok(LogState {
last_purged_log_id,
last_log_id,
})
}
async fn append_to_log<I>(&mut self, entries: I) -> Result<(), StorageError<OxirsNodeId>>
where
I: IntoIterator<Item = Entry<OxirsTypeConfig>> + Send,
{
let mut log = self.log.write().await;
log.extend(entries);
Ok(())
}
async fn delete_conflict_logs_since(
&mut self,
log_id: LogId<OxirsNodeId>,
) -> Result<(), StorageError<OxirsNodeId>> {
let mut log = self.log.write().await;
log.retain(|entry| entry.log_id.index < log_id.index);
Ok(())
}
async fn purge_logs_upto(
&mut self,
log_id: LogId<OxirsNodeId>,
) -> Result<(), StorageError<OxirsNodeId>> {
let mut log = self.log.write().await;
log.retain(|entry| entry.log_id.index > log_id.index);
Ok(())
}
async fn last_applied_state(
&mut self,
) -> Result<
(
Option<LogId<OxirsNodeId>>,
openraft::StoredMembership<OxirsNodeId, BasicNode>,
),
StorageError<OxirsNodeId>,
> {
let last_applied = *self.last_applied.read().await;
let membership =
openraft::StoredMembership::new(None, Membership::new(vec![BTreeSet::new()], None));
Ok((last_applied, membership))
}
async fn apply_to_state_machine(
&mut self,
entries: &[Entry<OxirsTypeConfig>],
) -> Result<Vec<RdfResponse>, StorageError<OxirsNodeId>> {
let mut responses = Vec::new();
let mut state = self.state.write().await;
for entry in entries {
if let EntryPayload::Normal(cmd) = &entry.payload {
let response = state.apply_command(cmd);
responses.push(response);
}
*self.last_applied.write().await = Some(entry.log_id);
}
Ok(responses)
}
async fn get_snapshot_builder(&mut self) -> Self::SnapshotBuilder {
self.clone()
}
async fn begin_receiving_snapshot(
&mut self,
) -> Result<Box<Cursor<Vec<u8>>>, StorageError<OxirsNodeId>> {
Ok(Box::new(Cursor::new(Vec::new())))
}
async fn install_snapshot(
&mut self,
meta: &SnapshotMeta<OxirsNodeId, BasicNode>,
snapshot: Box<Cursor<Vec<u8>>>,
) -> Result<(), StorageError<OxirsNodeId>> {
let data = snapshot.get_ref();
let new_state: RdfApp = serde_json::from_slice(data).map_err(|e| StorageError::IO {
source: StorageIOError::new(
ErrorSubject::StateMachine,
ErrorVerb::Read,
openraft::AnyError::new(&e),
),
})?;
*self.state.write().await = new_state;
*self.last_applied.write().await = meta.last_log_id;
Ok(())
}
async fn get_current_snapshot(
&mut self,
) -> Result<Option<Snapshot<OxirsTypeConfig>>, StorageError<OxirsNodeId>> {
Ok(self.snapshot.read().await.clone())
}
}
impl Clone for OxirsStorage {
fn clone(&self) -> Self {
Self {
state: Arc::clone(&self.state),
log: Arc::clone(&self.log),
hard_state: Arc::clone(&self.hard_state),
last_applied: Arc::clone(&self.last_applied),
snapshot: Arc::clone(&self.snapshot),
}
}
}
}
#[cfg(feature = "raft")]
pub use raft_impl::*;
pub struct RaftNode {
node_id: OxirsNodeId,
#[cfg(feature = "raft")]
raft: Option<Raft<OxirsTypeConfig>>,
storage: Arc<RwLock<RdfApp>>,
}
impl RaftNode {
pub fn new(node_id: OxirsNodeId) -> Self {
Self {
node_id,
#[cfg(feature = "raft")]
raft: None,
storage: Arc::new(RwLock::new(RdfApp::default())),
}
}
#[cfg(feature = "raft")]
pub async fn init_raft(&mut self, _peers: BTreeSet<OxirsNodeId>) -> Result<()> {
tracing::warn!(
"Raft initialization incomplete for OpenRaft 0.9.21 - using fallback storage mode"
);
Ok(())
}
pub async fn is_leader(&self) -> bool {
#[cfg(feature = "raft")]
{
if let Some(ref raft) = self.raft {
match raft.metrics().borrow().current_leader {
Some(leader) => leader == self.node_id,
None => false,
}
} else {
true
}
}
#[cfg(not(feature = "raft"))]
{
true
}
}
pub async fn current_term(&self) -> u64 {
#[cfg(feature = "raft")]
{
if let Some(ref raft) = self.raft {
raft.metrics().borrow().current_term
} else {
0
}
}
#[cfg(not(feature = "raft"))]
{
0
}
}
pub async fn submit_command(&self, cmd: RdfCommand) -> Result<RdfResponse> {
#[cfg(feature = "raft")]
{
if let Some(ref raft) = self.raft {
let response = raft
.client_write(cmd)
.await
.map_err(|e| anyhow::anyhow!("Failed to submit command: {}", e))?;
Ok(response.data)
} else {
if let Some(shared_storage) = get_global_shared_storage() {
let mut state = shared_storage.write().await;
Ok(state.apply_command(&cmd))
} else {
let mut state = self.storage.write().await;
Ok(state.apply_command(&cmd))
}
}
}
#[cfg(not(feature = "raft"))]
{
if let Some(shared_storage) = get_global_shared_storage() {
let mut state = shared_storage.write().await;
Ok(state.apply_command(&cmd))
} else {
let mut state = self.storage.write().await;
Ok(state.apply_command(&cmd))
}
}
}
#[cfg(feature = "raft")]
pub async fn get_metrics(&self) -> Option<RaftMetrics<OxirsNodeId, BasicNode>> {
self.raft
.as_ref()
.map(|raft| raft.metrics().borrow().clone())
}
pub async fn query(
&self,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Vec<(String, String, String)> {
if let Some(shared_storage) = get_global_shared_storage() {
let state = shared_storage.read().await;
state.query(subject, predicate, object)
} else {
let state = self.storage.read().await;
state.query(subject, predicate, object)
}
}
pub async fn len(&self) -> usize {
if let Some(shared_storage) = get_global_shared_storage() {
let state = shared_storage.read().await;
state.len()
} else {
let state = self.storage.read().await;
state.len()
}
}
pub async fn is_empty(&self) -> bool {
if let Some(shared_storage) = get_global_shared_storage() {
let state = shared_storage.read().await;
state.is_empty()
} else {
let state = self.storage.read().await;
state.is_empty()
}
}
pub async fn shutdown(&mut self) -> Result<()> {
tracing::info!("Shutting down raft node {}", self.node_id);
#[cfg(feature = "raft")]
{
if let Some(raft) = self.raft.take() {
raft.shutdown()
.await
.map_err(|e| anyhow::anyhow!("Failed to shutdown raft: {}", e))?;
tracing::info!("Raft instance shutdown completed");
}
}
{
let mut storage = self.storage.write().await;
*storage = RdfApp::default();
}
tracing::info!("Raft node {} shutdown completed", self.node_id);
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rdf_command_serialization() {
let cmd = RdfCommand::Insert {
subject: "s".to_string(),
predicate: "p".to_string(),
object: "o".to_string(),
};
let serialized = serde_json::to_string(&cmd).unwrap();
let deserialized: RdfCommand = serde_json::from_str(&serialized).unwrap();
assert_eq!(cmd, deserialized);
}
#[test]
fn test_rdf_response_serialization() {
let response = RdfResponse::Success;
let serialized = serde_json::to_string(&response).unwrap();
let deserialized: RdfResponse = serde_json::from_str(&serialized).unwrap();
assert_eq!(response, deserialized);
}
#[test]
fn test_rdf_app_apply_insert() {
let mut app = RdfApp::default();
let cmd = RdfCommand::Insert {
subject: "s".to_string(),
predicate: "p".to_string(),
object: "o".to_string(),
};
let response = app.apply_command(&cmd);
assert_eq!(response, RdfResponse::Success);
assert_eq!(app.len(), 1);
assert!(!app.is_empty());
}
#[test]
fn test_rdf_app_apply_delete() {
let mut app = RdfApp::default();
let insert_cmd = RdfCommand::Insert {
subject: "s".to_string(),
predicate: "p".to_string(),
object: "o".to_string(),
};
app.apply_command(&insert_cmd);
assert_eq!(app.len(), 1);
let delete_cmd = RdfCommand::Delete {
subject: "s".to_string(),
predicate: "p".to_string(),
object: "o".to_string(),
};
let response = app.apply_command(&delete_cmd);
assert_eq!(response, RdfResponse::Success);
assert_eq!(app.len(), 0);
assert!(app.is_empty());
}
#[test]
fn test_rdf_app_apply_clear() {
let mut app = RdfApp::default();
app.apply_command(&RdfCommand::Insert {
subject: "s1".to_string(),
predicate: "p1".to_string(),
object: "o1".to_string(),
});
app.apply_command(&RdfCommand::Insert {
subject: "s2".to_string(),
predicate: "p2".to_string(),
object: "o2".to_string(),
});
assert_eq!(app.len(), 2);
let response = app.apply_command(&RdfCommand::Clear);
assert_eq!(response, RdfResponse::Success);
assert_eq!(app.len(), 0);
assert!(app.is_empty());
}
#[test]
fn test_rdf_app_transactions() {
let mut app = RdfApp::default();
let tx_id = "tx1".to_string();
let response = app.apply_command(&RdfCommand::BeginTransaction {
tx_id: tx_id.clone(),
});
assert_eq!(
response,
RdfResponse::TransactionStarted {
tx_id: tx_id.clone()
}
);
assert!(app.transactions.contains_key(&tx_id));
let response = app.apply_command(&RdfCommand::CommitTransaction {
tx_id: tx_id.clone(),
});
assert_eq!(
response,
RdfResponse::TransactionCommitted {
tx_id: tx_id.clone()
}
);
assert!(!app.transactions.contains_key(&tx_id));
}
#[test]
fn test_rdf_app_transaction_rollback() {
let mut app = RdfApp::default();
let tx_id = "tx1".to_string();
app.apply_command(&RdfCommand::BeginTransaction {
tx_id: tx_id.clone(),
});
assert!(app.transactions.contains_key(&tx_id));
let response = app.apply_command(&RdfCommand::RollbackTransaction {
tx_id: tx_id.clone(),
});
assert_eq!(
response,
RdfResponse::TransactionRolledBack {
tx_id: tx_id.clone()
}
);
assert!(!app.transactions.contains_key(&tx_id));
}
#[test]
fn test_rdf_app_query() {
let mut app = RdfApp::default();
app.apply_command(&RdfCommand::Insert {
subject: "s1".to_string(),
predicate: "p1".to_string(),
object: "o1".to_string(),
});
app.apply_command(&RdfCommand::Insert {
subject: "s1".to_string(),
predicate: "p2".to_string(),
object: "o2".to_string(),
});
app.apply_command(&RdfCommand::Insert {
subject: "s2".to_string(),
predicate: "p1".to_string(),
object: "o3".to_string(),
});
let results = app.query(None, None, None);
assert_eq!(results.len(), 3);
let results = app.query(Some("s1"), None, None);
assert_eq!(results.len(), 2);
let results = app.query(None, Some("p1"), None);
assert_eq!(results.len(), 2);
let results = app.query(None, None, Some("o1"));
assert_eq!(results.len(), 1);
let results = app.query(Some("s1"), Some("p1"), Some("o1"));
assert_eq!(results.len(), 1);
assert_eq!(
results[0],
("s1".to_string(), "p1".to_string(), "o1".to_string())
);
}
#[tokio::test]
async fn test_raft_node_creation() {
let node = RaftNode::new(1);
assert_eq!(node.node_id, 1);
assert!(node.is_leader().await);
assert_eq!(node.current_term().await, 0);
assert_eq!(node.len().await, 0);
assert!(node.is_empty().await);
}
#[tokio::test]
async fn test_raft_node_local_operations() {
let node = RaftNode::new(1);
let cmd = RdfCommand::Insert {
subject: "s".to_string(),
predicate: "p".to_string(),
object: "o".to_string(),
};
let response = node.submit_command(cmd).await.unwrap();
assert_eq!(response, RdfResponse::Success);
assert_eq!(node.len().await, 1);
assert!(!node.is_empty().await);
let results = node.query(Some("s"), Some("p"), Some("o")).await;
assert_eq!(results.len(), 1);
assert_eq!(
results[0],
("s".to_string(), "p".to_string(), "o".to_string())
);
}
}