use std::collections::BTreeSet;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::watch;
use crabka_metadata::{MetadataImage, MetadataRecord};
use crabka_raft::{
AddVoter, ControllerHandle, Node, NodeId, OutboundDialer, QuorumState, RaftError,
ReconfigOutcome, RemoveVoter, SnapshotRange, UpdateVoter,
};
use crate::metadata_observer::MetadataObserver;
#[async_trait::async_trait]
pub trait MetadataSource: Send + Sync {
fn current_image(&self) -> Arc<MetadataImage>;
fn watch_image(&self) -> watch::Receiver<Arc<MetadataImage>>;
fn watch_leader(&self) -> watch::Receiver<Option<NodeId>>;
fn quorum_state(&self) -> QuorumState;
async fn submit_change(&self, records: Vec<MetadataRecord>) -> Result<(), RaftError>;
async fn change_membership(&self, new_voters: BTreeSet<NodeId>) -> Result<(), RaftError>;
async fn add_learner(&self, node_id: NodeId, node: Node) -> Result<(), RaftError>;
fn controller_bound_addr(&self) -> SocketAddr;
fn read_snapshot_range(&self, position: i64, max_bytes: i32) -> SnapshotRange;
async fn trigger_snapshot(&self) -> Result<(), RaftError>;
async fn add_voter(&self, req: AddVoter) -> Result<ReconfigOutcome, RaftError>;
async fn remove_voter(&self, req: RemoveVoter) -> Result<ReconfigOutcome, RaftError>;
async fn update_voter(&self, req: UpdateVoter) -> Result<ReconfigOutcome, RaftError>;
async fn cancel(&self);
}
#[async_trait::async_trait]
impl MetadataSource for ControllerHandle {
fn current_image(&self) -> Arc<MetadataImage> {
ControllerHandle::current_image(self)
}
fn watch_image(&self) -> watch::Receiver<Arc<MetadataImage>> {
ControllerHandle::watch_image(self)
}
fn watch_leader(&self) -> watch::Receiver<Option<NodeId>> {
ControllerHandle::watch_leader(self)
}
fn quorum_state(&self) -> QuorumState {
ControllerHandle::quorum_state(self)
}
async fn submit_change(&self, records: Vec<MetadataRecord>) -> Result<(), RaftError> {
ControllerHandle::submit_change(self, records).await
}
async fn change_membership(&self, new_voters: BTreeSet<NodeId>) -> Result<(), RaftError> {
ControllerHandle::change_membership(self, new_voters).await
}
async fn add_learner(&self, node_id: NodeId, node: Node) -> Result<(), RaftError> {
ControllerHandle::add_learner(self, node_id, node).await
}
fn controller_bound_addr(&self) -> SocketAddr {
ControllerHandle::controller_bound_addr(self)
}
fn read_snapshot_range(&self, position: i64, max_bytes: i32) -> SnapshotRange {
ControllerHandle::read_snapshot_range(self, position, max_bytes)
}
async fn trigger_snapshot(&self) -> Result<(), RaftError> {
ControllerHandle::trigger_snapshot(self).await
}
async fn add_voter(&self, req: AddVoter) -> Result<ReconfigOutcome, RaftError> {
ControllerHandle::add_voter(self, req).await
}
async fn remove_voter(&self, req: RemoveVoter) -> Result<ReconfigOutcome, RaftError> {
ControllerHandle::remove_voter(self, req).await
}
async fn update_voter(&self, req: UpdateVoter) -> Result<ReconfigOutcome, RaftError> {
ControllerHandle::update_voter(self, req).await
}
async fn cancel(&self) {
ControllerHandle::cancel(self).await;
}
}
pub struct ObserverSource {
observer: Arc<MetadataObserver>,
writer: Arc<dyn MetadataWriter>,
}
#[async_trait::async_trait]
pub trait MetadataWriter: Send + Sync {
async fn submit_change(&self, records: Vec<MetadataRecord>) -> Result<(), RaftError>;
}
impl ObserverSource {
#[must_use]
pub fn new(observer: Arc<MetadataObserver>, writer: Arc<dyn MetadataWriter>) -> Self {
Self { observer, writer }
}
}
#[async_trait::async_trait]
impl MetadataSource for ObserverSource {
fn current_image(&self) -> Arc<MetadataImage> {
self.observer.current_image()
}
fn watch_image(&self) -> watch::Receiver<Arc<MetadataImage>> {
self.observer.watch_image()
}
fn watch_leader(&self) -> watch::Receiver<Option<NodeId>> {
self.observer.watch_leader()
}
fn quorum_state(&self) -> QuorumState {
QuorumState {
current_term: 0,
last_applied_index: 0,
current_leader: *self.observer.watch_leader().borrow(),
voters: Vec::new(),
voter_nodes: std::collections::BTreeMap::new(),
per_voter_matched_index: std::collections::BTreeMap::new(),
}
}
async fn submit_change(&self, records: Vec<MetadataRecord>) -> Result<(), RaftError> {
self.writer.submit_change(records).await
}
async fn change_membership(&self, _new_voters: BTreeSet<NodeId>) -> Result<(), RaftError> {
Err(RaftError::NotLeader {
current_leader: None,
})
}
async fn add_learner(&self, _node_id: NodeId, _node: Node) -> Result<(), RaftError> {
Err(RaftError::NotLeader {
current_leader: None,
})
}
fn controller_bound_addr(&self) -> SocketAddr {
SocketAddr::from(([0, 0, 0, 0], 0))
}
fn read_snapshot_range(&self, _position: i64, _max_bytes: i32) -> SnapshotRange {
SnapshotRange::NoSnapshot
}
async fn trigger_snapshot(&self) -> Result<(), RaftError> {
Err(RaftError::NotLeader {
current_leader: None,
})
}
async fn add_voter(&self, _req: AddVoter) -> Result<ReconfigOutcome, RaftError> {
Err(RaftError::NotLeader {
current_leader: None,
})
}
async fn remove_voter(&self, _req: RemoveVoter) -> Result<ReconfigOutcome, RaftError> {
Err(RaftError::NotLeader {
current_leader: None,
})
}
async fn update_voter(&self, _req: UpdateVoter) -> Result<ReconfigOutcome, RaftError> {
Err(RaftError::NotLeader {
current_leader: None,
})
}
async fn cancel(&self) {
self.observer.cancel().await;
}
}
pub struct QuorumForwarder {
pub(crate) voters: Vec<(NodeId, SocketAddr)>,
pub(crate) dialer: Arc<dyn OutboundDialer>,
pub(crate) client_id: String,
pub(crate) leader: watch::Receiver<Option<NodeId>>,
}
impl QuorumForwarder {
async fn try_submit(
&self,
target: NodeId,
addr: SocketAddr,
body: &[u8],
) -> Result<crabka_raft::CrabkaSubmitChangeResponse, RaftError> {
let opts = crabka_client_core::ConnectionOptions {
client_id: self.client_id.clone(),
..crabka_client_core::ConnectionOptions::default()
};
let conn = self
.dialer
.dial(target, &addr.to_string(), opts)
.await
.map_err(RaftError::Network)?;
let resp_body = conn
.raw_request(
crabka_raft::API_KEY_SUBMIT_CHANGE,
0,
bytes::Bytes::copy_from_slice(body),
)
.await
.map_err(RaftError::Network)?;
conn.close();
let mut cur: &[u8] = &resp_body;
crabka_raft::CrabkaSubmitChangeResponse::decode_v0(&mut cur).map_err(RaftError::Protocol)
}
}
#[async_trait::async_trait]
impl MetadataWriter for QuorumForwarder {
async fn submit_change(&self, records: Vec<MetadataRecord>) -> Result<(), RaftError> {
let payload =
<serde_wincode::SerdeCompat<Vec<MetadataRecord>> as wincode::Serialize>::serialize(
&records,
)
.map_err(RaftError::from)?;
let req = crabka_raft::CrabkaSubmitChangeRequest {
records: bytes::Bytes::from(payload),
};
let mut body = Vec::with_capacity(req.records.len() + 4);
req.encode_v0(&mut body).map_err(RaftError::Protocol)?;
let hint = *self.leader.borrow();
let mut order: Vec<(NodeId, SocketAddr)> = Vec::new();
if let Some(l) = hint
&& let Some(t) = self.voters.iter().find(|(id, _)| *id == l)
{
order.push(*t);
}
for v in &self.voters {
if Some(v.0) != hint {
order.push(*v);
}
}
let mut last_err = RaftError::NotLeader {
current_leader: hint,
};
for (target, addr) in order {
match self.try_submit(target, addr, &body).await {
Ok(resp) if resp.error_code == 0 => return Ok(()),
Ok(resp) if resp.error_code == 2 => {
return Err(RaftError::Metadata(
crabka_metadata::MetadataError::TopicExists(String::new()),
));
}
Ok(resp) => {
last_err = RaftError::NotLeader {
current_leader: (resp.leader_hint >= 0)
.then(|| u64::try_from(resp.leader_hint).unwrap_or(0)),
};
}
Err(e) => last_err = e,
}
}
Err(last_err)
}
}