use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::{mpsc, RwLock};
use crate::{
clustering::{ConsistencyLevel, NodeInfo, ReplicationConfig},
error::{FusekiError, FusekiResult},
store::Store,
};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum QueryResult {
Boolean(bool),
Data(String),
Error(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DistributedQuery {
pub id: String,
pub query: String,
pub partitions: Vec<u32>,
pub consistency: ConsistencyLevel,
pub timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueryResponse {
pub node_id: String,
pub result: QueryResult,
pub execution_time: Duration,
pub success: bool,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DistributedWrite {
pub id: String,
pub operation: WriteOperation,
pub partitions: Vec<u32>,
pub consistency: ConsistencyLevel,
pub timeout: Duration,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableTriple {
pub subject: String,
pub predicate: String,
pub object: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SerializableQuad {
pub subject: String,
pub predicate: String,
pub object: String,
pub graph: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum WriteOperation {
AddTriple {
triple: SerializableTriple,
graph: Option<String>,
},
RemoveTriple {
triple: SerializableTriple,
graph: Option<String>,
},
AddQuad { quad: SerializableQuad },
RemoveQuad { quad: SerializableQuad },
ClearGraph { graph: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WriteResponse {
pub node_id: String,
pub success: bool,
pub error: Option<String>,
}
pub struct QueryCoordinator {
config: ReplicationConfig,
store: Arc<Store>,
node_connections: Arc<RwLock<HashMap<String, NodeConnection>>>,
request_tracker: Arc<RwLock<HashMap<String, RequestStatus>>>,
}
#[allow(dead_code)]
struct NodeConnection {
node_info: NodeInfo,
request_tx: mpsc::Sender<CoordinatorRequest>,
response_rx: Arc<RwLock<mpsc::Receiver<CoordinatorResponse>>>,
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
enum CoordinatorRequest {
Query(DistributedQuery),
Write(DistributedWrite),
}
#[derive(Debug, Clone)]
enum CoordinatorResponse {
Query(QueryResponse),
Write(WriteResponse),
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
struct RequestStatus {
start_time: Instant,
expected_responses: usize,
responses: Vec<CoordinatorResponse>,
completed: bool,
}
impl QueryCoordinator {
pub fn new(config: ReplicationConfig, store: Arc<Store>) -> Self {
Self {
config,
store,
node_connections: Arc::new(RwLock::new(HashMap::new())),
request_tracker: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn execute_query(&self, query: DistributedQuery) -> FusekiResult<QueryResult> {
let start_time = Instant::now();
let nodes = self.get_nodes_for_partitions(&query.partitions).await?;
if nodes.is_empty() {
return Err(FusekiError::Internal {
message: "No nodes available for query execution".to_string(),
});
}
let required_responses = self.calculate_required_responses(nodes.len(), query.consistency);
let mut tracker = self.request_tracker.write().await;
tracker.insert(
query.id.clone(),
RequestStatus {
start_time,
expected_responses: nodes.len(),
responses: Vec::new(),
completed: false,
},
);
drop(tracker);
let mut response_futures = Vec::new();
for node_id in nodes {
let query_clone = query.clone();
let coordinator = self.clone();
response_futures.push(tokio::spawn(async move {
coordinator.send_query_to_node(&node_id, query_clone).await
}));
}
let responses = match tokio::time::timeout(
query.timeout,
self.collect_responses(&query.id, required_responses),
)
.await
{
Ok(responses) => responses?,
Err(_) => {
return Err(FusekiError::Internal {
message: "Query timeout".to_string(),
});
}
};
let result = self.merge_query_results(responses)?;
let mut tracker = self.request_tracker.write().await;
tracker.remove(&query.id);
Ok(result)
}
pub async fn execute_write(&self, write: DistributedWrite) -> FusekiResult<()> {
let start_time = Instant::now();
let nodes = self.get_nodes_for_partitions(&write.partitions).await?;
if nodes.is_empty() {
return Err(FusekiError::Internal {
message: "No nodes available for write operation".to_string(),
});
}
let required_responses = self.calculate_required_responses(nodes.len(), write.consistency);
let mut tracker = self.request_tracker.write().await;
tracker.insert(
write.id.clone(),
RequestStatus {
start_time,
expected_responses: nodes.len(),
responses: Vec::new(),
completed: false,
},
);
drop(tracker);
let mut response_futures = Vec::new();
for node_id in nodes {
let write_clone = write.clone();
let coordinator = self.clone();
response_futures.push(tokio::spawn(async move {
coordinator.send_write_to_node(&node_id, write_clone).await
}));
}
let responses = match tokio::time::timeout(
write.timeout,
self.collect_responses(&write.id, required_responses),
)
.await
{
Ok(responses) => responses?,
Err(_) => {
return Err(FusekiError::Internal {
message: "Write timeout".to_string(),
});
}
};
let successful_count = responses
.iter()
.filter(|r| match r {
CoordinatorResponse::Write(w) => w.success,
_ => false,
})
.count();
if successful_count < required_responses {
return Err(FusekiError::Internal {
message: format!(
"Write failed: only {successful_count} of {required_responses} required responses succeeded"
),
});
}
let mut tracker = self.request_tracker.write().await;
tracker.remove(&write.id);
Ok(())
}
async fn get_nodes_for_partitions(&self, _partitions: &[u32]) -> FusekiResult<Vec<String>> {
Ok(vec!["node1".to_string()])
}
fn calculate_required_responses(
&self,
total_nodes: usize,
consistency: ConsistencyLevel,
) -> usize {
match consistency {
ConsistencyLevel::One => 1,
ConsistencyLevel::Quorum => (total_nodes / 2) + 1,
ConsistencyLevel::All => total_nodes,
ConsistencyLevel::LocalQuorum => (total_nodes / 2) + 1, ConsistencyLevel::EachQuorum => total_nodes, }
}
async fn send_query_to_node(&self, node_id: &str, query: DistributedQuery) -> FusekiResult<()> {
let start = Instant::now();
let result = self.execute_local_query(&query).await?;
let response = QueryResponse {
node_id: node_id.to_string(),
result,
execution_time: start.elapsed(),
success: true,
error: None,
};
let mut tracker = self.request_tracker.write().await;
if let Some(status) = tracker.get_mut(&query.id) {
status.responses.push(CoordinatorResponse::Query(response));
}
Ok(())
}
async fn send_write_to_node(&self, node_id: &str, write: DistributedWrite) -> FusekiResult<()> {
let success = self.execute_local_write(&write).await.is_ok();
let response = WriteResponse {
node_id: node_id.to_string(),
success,
error: if success {
None
} else {
Some("Write failed".to_string())
},
};
let mut tracker = self.request_tracker.write().await;
if let Some(status) = tracker.get_mut(&write.id) {
status.responses.push(CoordinatorResponse::Write(response));
}
Ok(())
}
async fn execute_local_query(&self, _query: &DistributedQuery) -> FusekiResult<QueryResult> {
Ok(QueryResult::Boolean(false))
}
async fn execute_local_write(&self, _write: &DistributedWrite) -> FusekiResult<()> {
Ok(())
}
async fn collect_responses(
&self,
request_id: &str,
required: usize,
) -> FusekiResult<Vec<CoordinatorResponse>> {
let check_interval = Duration::from_millis(10);
loop {
tokio::time::sleep(check_interval).await;
let tracker = self.request_tracker.read().await;
if let Some(status) = tracker.get(request_id) {
if status.responses.len() >= required {
let collected = status.responses.clone();
return Ok(collected);
}
} else {
return Err(FusekiError::Internal {
message: "Request not found".to_string(),
});
}
}
}
fn merge_query_results(
&self,
responses: Vec<CoordinatorResponse>,
) -> FusekiResult<QueryResult> {
let mut results = Vec::new();
for response in responses {
if let CoordinatorResponse::Query(query_resp) = response {
if query_resp.success {
results.push(query_resp.result);
}
}
}
if results.is_empty() {
return Err(FusekiError::Internal {
message: "No successful query responses".to_string(),
});
}
Ok(results
.into_iter()
.next()
.expect("results should not be empty after check"))
}
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
store: self.store.clone(),
node_connections: self.node_connections.clone(),
request_tracker: self.request_tracker.clone(),
}
}
}
pub struct ReadRepair {
#[allow(dead_code)]
coordinator: Arc<QueryCoordinator>,
}
impl ReadRepair {
pub fn new(coordinator: Arc<QueryCoordinator>) -> Self {
Self { coordinator }
}
pub async fn repair(&self, _key: &str, responses: Vec<QueryResponse>) -> FusekiResult<()> {
let latest = self.find_latest_value(&responses)?;
let stale_nodes = self.find_stale_nodes(&responses, &latest);
if !stale_nodes.is_empty() {
self.repair_nodes(&stale_nodes, &latest).await?;
}
Ok(())
}
fn find_latest_value(&self, responses: &[QueryResponse]) -> FusekiResult<QueryResponse> {
responses
.iter()
.filter(|r| r.success)
.max_by_key(|r| r.execution_time)
.cloned()
.ok_or_else(|| FusekiError::Internal {
message: "No successful responses for read repair".to_string(),
})
}
fn find_stale_nodes(&self, responses: &[QueryResponse], latest: &QueryResponse) -> Vec<String> {
responses
.iter()
.filter(|r| r.success && !self.results_equal(&r.result, &latest.result))
.map(|r| r.node_id.clone())
.collect()
}
fn results_equal(&self, _a: &QueryResult, _b: &QueryResult) -> bool {
false
}
async fn repair_nodes(&self, _nodes: &[String], _latest: &QueryResponse) -> FusekiResult<()> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_consistency_level_calculation() {
let config = ReplicationConfig::default();
let store = Arc::new(Store::new().unwrap());
let coordinator = QueryCoordinator::new(config, store);
assert_eq!(
coordinator.calculate_required_responses(3, ConsistencyLevel::One),
1
);
assert_eq!(
coordinator.calculate_required_responses(3, ConsistencyLevel::Quorum),
2
);
assert_eq!(
coordinator.calculate_required_responses(3, ConsistencyLevel::All),
3
);
assert_eq!(
coordinator.calculate_required_responses(5, ConsistencyLevel::Quorum),
3
);
assert_eq!(
coordinator.calculate_required_responses(7, ConsistencyLevel::Quorum),
4
);
}
}