use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, warn};
use crabka_metadata::{MetadataImage, from_kraft_value};
use crabka_protocol::records::RecordBatch;
use crabka_raft::{NodeId, OutboundDialer};
#[derive(Clone)]
pub struct ObserverConfig {
pub voters: Vec<(NodeId, SocketAddr)>,
pub dialer: Arc<dyn OutboundDialer>,
pub client_id: String,
pub cluster_id: uuid::Uuid,
pub max_bytes: u32,
pub poll_interval: Duration,
}
pub struct MetadataObserver {
image: watch::Sender<Arc<MetadataImage>>,
leader: watch::Sender<Option<NodeId>>,
shutdown: CancellationToken,
task: tokio::sync::Mutex<Option<JoinHandle<()>>>,
}
impl MetadataObserver {
#[must_use]
pub fn start(config: ObserverConfig) -> Arc<Self> {
let (image_tx, _) = watch::channel(Arc::new(MetadataImage::new(config.cluster_id)));
let (leader_tx, _) = watch::channel(None);
let shutdown = CancellationToken::new();
let observer = Arc::new(Self {
image: image_tx,
leader: leader_tx,
shutdown: shutdown.clone(),
task: tokio::sync::Mutex::new(None),
});
let task = tokio::spawn(run_loop(config, observer.clone(), shutdown));
if let Ok(mut guard) = observer.task.try_lock() {
*guard = Some(task);
}
observer
}
#[must_use]
pub fn current_image(&self) -> Arc<MetadataImage> {
self.image.borrow().clone()
}
#[must_use]
pub fn watch_image(&self) -> watch::Receiver<Arc<MetadataImage>> {
self.image.subscribe()
}
#[must_use]
pub fn watch_leader(&self) -> watch::Receiver<Option<NodeId>> {
self.leader.subscribe()
}
pub async fn cancel(&self) {
self.shutdown.cancel();
if let Some(h) = self.task.lock().await.take() {
let _ = h.await;
}
}
}
async fn fetch_once(
config: &ObserverConfig,
addr: SocketAddr,
target: NodeId,
fetch_offset: u64,
image_tx: &watch::Sender<Arc<MetadataImage>>,
) -> Option<u64> {
let req = crabka_raft::CrabkaMetadataFetchRequest {
fetch_offset: i64::try_from(fetch_offset).unwrap_or(i64::MAX),
max_bytes: i32::try_from(config.max_bytes).unwrap_or(i32::MAX),
};
let mut body = Vec::with_capacity(12);
req.encode_v0(&mut body);
let opts = crabka_client_core::ConnectionOptions {
client_id: config.client_id.clone(),
..crabka_client_core::ConnectionOptions::default()
};
let conn = match config.dialer.dial(target, &addr.to_string(), opts).await {
Ok(c) => c,
Err(e) => {
debug!(%addr, error = %e, "observer dial failed");
return None;
}
};
let resp_body = match conn
.raw_request(
crabka_raft::API_KEY_METADATA_FETCH,
0,
bytes::Bytes::from(body),
)
.await
{
Ok(b) => b,
Err(e) => {
debug!(%addr, error = %e, "observer fetch request failed");
conn.close();
return None;
}
};
conn.close();
let mut cur: &[u8] = &resp_body;
let resp = match crabka_raft::CrabkaMetadataFetchResponse::decode_v0(&mut cur) {
Ok(r) => r,
Err(e) => {
warn!(%addr, error = %e, "observer response decode failed");
return None;
}
};
if resp.error_code != 0 {
return None;
}
if resp.records.is_empty() {
return Some(fetch_offset);
}
let mut next: MetadataImage = (**image_tx.borrow()).clone();
let mut new_offset = fetch_offset;
let mut buf: &[u8] = &resp.records;
while !buf.is_empty() {
let batch = match RecordBatch::decode(&mut buf) {
Ok(b) => b,
Err(e) => {
warn!(error = %e, "observer batch decode failed");
break;
}
};
let index = u64::try_from(batch.base_offset.max(0)).unwrap_or(0);
if batch.attributes.is_control_batch() {
new_offset = index + 1;
continue;
}
for r in &batch.records {
let Some(value) = r.value.as_ref() else {
continue;
};
match from_kraft_value(value, &next) {
Ok(rec) => {
if let Err(e) = next.validate(&rec) {
warn!(error = %e, "observer skipped record failing validation");
continue;
}
next.apply(&rec);
}
Err(e) => warn!(error = %e, "observer failed to decode record"),
}
}
new_offset = index + 1;
}
if new_offset != fetch_offset {
let _ = image_tx.send_replace(Arc::new(next));
}
Some(new_offset.max(fetch_offset))
}
async fn run_loop(
config: ObserverConfig,
observer: Arc<MetadataObserver>,
shutdown: CancellationToken,
) {
let mut fetch_offset: u64 = 0;
let mut target_idx: usize = 0;
loop {
if shutdown.is_cancelled() {
return;
}
if config.voters.is_empty() {
tokio::time::sleep(config.poll_interval).await;
continue;
}
let (target, addr) = config.voters[target_idx % config.voters.len()];
let result = tokio::select! {
() = shutdown.cancelled() => return,
r = fetch_once(&config, addr, target, fetch_offset, &observer.image) => r,
};
if let Some(new_offset) = result {
let _ = observer.leader.send_replace(Some(target));
if new_offset == fetch_offset {
tokio::select! {
() = shutdown.cancelled() => return,
() = tokio::time::sleep(config.poll_interval) => {}
}
} else {
fetch_offset = new_offset;
}
} else {
target_idx = target_idx.wrapping_add(1);
tokio::select! {
() = shutdown.cancelled() => return,
() = tokio::time::sleep(config.poll_interval) => {}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use crabka_metadata::{MetadataRecord, TopicRecord};
use crabka_raft::{BootstrapMode, Controller, ControllerConfig};
use tempfile::TempDir;
use uuid::Uuid;
#[tokio::test]
async fn observer_replicates_committed_topic() {
let dir = TempDir::new().unwrap();
let cfg = ControllerConfig {
bootstrap_mode: BootstrapMode::Bootstrap,
..ControllerConfig::for_tests(1, dir.path().to_path_buf())
};
let ctrl = Controller::start(cfg).await.expect("bootstrap");
let mut leader_rx = ctrl.watch_leader();
while leader_rx.borrow().is_none() {
leader_rx.changed().await.unwrap();
}
let ctrl_addr = ctrl.controller_bound_addr();
ctrl.submit_change(vec![MetadataRecord::V1Topic(TopicRecord {
name: "observed".into(),
topic_id: Uuid::new_v4(),
partitions: 1,
replication_factor: 1,
})])
.await
.expect("submit");
let observer = MetadataObserver::start(ObserverConfig {
voters: vec![(1, ctrl_addr)],
dialer: Arc::new(crabka_raft::PlaintextDialer),
client_id: "test-observer".into(),
cluster_id: Uuid::nil(),
max_bytes: 1_048_576,
poll_interval: Duration::from_millis(50),
});
let mut img_rx = observer.watch_image();
let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
loop {
if img_rx.borrow().topic("observed").is_some() {
break;
}
assert!(
tokio::time::Instant::now() <= deadline,
"observer did not replicate topic within 5s"
);
let _ = tokio::time::timeout(Duration::from_millis(200), img_rx.changed()).await;
}
observer.cancel().await;
ctrl.shutdown().await;
}
}