use std::sync::Arc;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tracing::debug;
use oxirs_cluster::consensus::ConsensusManager;
use super::raft_state::{RaftBackedOperatorState, StateValue};
#[derive(Debug, Error)]
pub enum LinearizableReadError {
#[error("local node is not the leader")]
NotLeader,
#[error("state error: {0}")]
State(String),
}
pub type LinearizableReadResult<T> = std::result::Result<T, LinearizableReadError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LinearizableReadConfig {
pub require_leader: bool,
}
impl Default for LinearizableReadConfig {
fn default() -> Self {
Self {
require_leader: true,
}
}
}
pub struct LinearizableReader {
consensus: Arc<ConsensusManager>,
state: Arc<RaftBackedOperatorState>,
config: LinearizableReadConfig,
}
impl LinearizableReader {
pub fn new(
consensus: Arc<ConsensusManager>,
state: Arc<RaftBackedOperatorState>,
config: LinearizableReadConfig,
) -> Self {
Self {
consensus,
state,
config,
}
}
pub async fn is_leader(&self) -> bool {
self.consensus.is_leader().await
}
pub async fn term(&self) -> u64 {
self.consensus.current_term().await
}
pub async fn get(&self, key: &str) -> LinearizableReadResult<Option<StateValue>> {
let term = self.consensus.current_term().await;
if self.config.require_leader && !self.consensus.is_leader().await {
return Err(LinearizableReadError::NotLeader);
}
debug!(term, key, "linearizable read served (sticky)");
Ok(self.state.get_local(key))
}
pub async fn get_with_barrier(&self, key: &str) -> LinearizableReadResult<Option<StateValue>> {
if self.config.require_leader && !self.consensus.is_leader().await {
return Err(LinearizableReadError::NotLeader);
}
let tx_id = format!("oxirs-stream-barrier-{}", uuid::Uuid::new_v4());
self.consensus
.begin_transaction(tx_id.clone())
.await
.map_err(|e| LinearizableReadError::State(e.to_string()))?;
self.consensus
.rollback_transaction(tx_id)
.await
.map_err(|e| LinearizableReadError::State(e.to_string()))?;
Ok(self.state.get_local(key))
}
pub fn state(&self) -> &Arc<RaftBackedOperatorState> {
&self.state
}
pub fn consensus(&self) -> &Arc<ConsensusManager> {
&self.consensus
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::state::raft_state::{RaftBackedOperatorState, RaftStateConfig, StateValue};
use async_trait::async_trait;
use oxirs_cluster::stream_integration::StreamMessage;
use oxirs_cluster::streaming::cluster_sink::{SinkError, StreamSink};
use parking_lot::Mutex;
#[derive(Default)]
struct PassThroughSink {
committed: Mutex<u64>,
}
#[async_trait]
impl StreamSink for PassThroughSink {
async fn write_batch(&self, _events: Vec<StreamMessage>) -> Result<(), SinkError> {
*self.committed.lock() += 1;
Ok(())
}
}
#[tokio::test]
async fn read_returns_locally_cached_value_when_not_requiring_leader() {
let consensus = Arc::new(ConsensusManager::new(1, vec![]));
let sink = Arc::new(PassThroughSink::default());
let state = Arc::new(RaftBackedOperatorState::new(
RaftStateConfig {
operator_id: "lin-test".into(),
stream_id: None,
},
sink,
));
state.put("k", StateValue::Counter(11)).await.expect("put");
let reader = LinearizableReader::new(
consensus,
state,
LinearizableReadConfig {
require_leader: false,
},
);
let v = reader.get("k").await.expect("ok");
assert_eq!(v, Some(StateValue::Counter(11)));
}
#[tokio::test]
async fn missing_key_returns_none() {
let consensus = Arc::new(ConsensusManager::new(2, vec![]));
let sink = Arc::new(PassThroughSink::default());
let state = Arc::new(RaftBackedOperatorState::new(
RaftStateConfig {
operator_id: "lin-test".into(),
stream_id: None,
},
sink,
));
let reader = LinearizableReader::new(
consensus,
state,
LinearizableReadConfig {
require_leader: false,
},
);
let v = reader.get("ghost").await.expect("ok");
assert!(v.is_none());
}
#[tokio::test]
async fn get_with_barrier_succeeds_on_leader_after_put() {
use oxirs_cluster::raft::{init_global_shared_storage, reset_global_shared_storage};
static TEST_LOCK: tokio::sync::Mutex<()> = tokio::sync::Mutex::const_new(());
let _g = TEST_LOCK.lock().await;
init_global_shared_storage();
reset_global_shared_storage().await;
let consensus = Arc::new(ConsensusManager::new(11, vec![]));
let sink = Arc::new(PassThroughSink::default());
let state = Arc::new(RaftBackedOperatorState::new(
RaftStateConfig {
operator_id: "lin-barrier".into(),
stream_id: None,
},
sink,
));
state
.put("counter", StateValue::Counter(7))
.await
.expect("put");
let reader = LinearizableReader::new(
consensus,
state,
LinearizableReadConfig {
require_leader: true,
},
);
let v = reader.get_with_barrier("counter").await.expect("ok");
assert_eq!(v, Some(StateValue::Counter(7)));
}
#[tokio::test]
async fn require_leader_true_serves_when_local_is_leader() {
let consensus = Arc::new(ConsensusManager::new(10, vec![]));
let sink = Arc::new(PassThroughSink::default());
let state = Arc::new(RaftBackedOperatorState::new(
RaftStateConfig {
operator_id: "lin-test".into(),
stream_id: None,
},
sink,
));
state.put("k", StateValue::Counter(99)).await.expect("put");
let reader = LinearizableReader::new(
consensus,
state,
LinearizableReadConfig {
require_leader: true,
},
);
assert!(reader.is_leader().await);
let v = reader.get("k").await.expect("ok");
assert_eq!(v, Some(StateValue::Counter(99)));
}
#[tokio::test]
async fn term_query_returns_finite_value() {
let consensus = Arc::new(ConsensusManager::new(3, vec![]));
let sink = Arc::new(PassThroughSink::default());
let state = Arc::new(RaftBackedOperatorState::new(
RaftStateConfig {
operator_id: "lin-test".into(),
stream_id: None,
},
sink,
));
let reader = LinearizableReader::new(consensus, state, LinearizableReadConfig::default());
let _ = reader.term().await;
}
}