use parking_lot::RwLock;
use crate::bus::{TelemetryBus, TelemetryEvent};
#[cfg(any(feature = "sharding", test))]
use crate::dto::ShardRegionInfo;
use crate::dto::{ShardingEvent, ShardingSnapshot};
pub struct ShardingProbe {
bus: TelemetryBus,
snapshot: RwLock<ShardingSnapshot>,
}
impl ShardingProbe {
pub fn new(bus: TelemetryBus) -> Self {
Self { bus, snapshot: RwLock::new(ShardingSnapshot::default()) }
}
pub fn set_snapshot(&self, snap: ShardingSnapshot) {
*self.snapshot.write() = snap;
}
pub fn snapshot(&self) -> ShardingSnapshot {
self.snapshot.read().clone()
}
pub fn record_shard_event(&self, region_id: &str, shard_id: &str, event: &str) {
self.bus.publish(TelemetryEvent::ShardingChanged(ShardingEvent {
region_id: region_id.to_string(),
shard_id: shard_id.to_string(),
event: event.to_string(),
}));
}
}
#[cfg(feature = "sharding")]
pub fn region_info<E: atomr_cluster_sharding::MessageExtractor>(
region: &atomr_cluster_sharding::ShardRegion<E>,
) -> ShardRegionInfo {
ShardRegionInfo {
region_id: region.region_id().to_string(),
shard_count: region.shard_count(),
shards: region.shard_ids(),
}
}
#[cfg(feature = "sharding")]
pub fn coordinator_allocations(coord: &atomr_cluster_sharding::ShardCoordinator) -> Vec<(String, String)> {
coord.allocations()
}
impl ShardingProbe {
#[cfg(feature = "sharding")]
pub fn refresh_from<E: atomr_cluster_sharding::MessageExtractor>(
&self,
regions: &[&atomr_cluster_sharding::ShardRegion<E>],
coordinator: &atomr_cluster_sharding::ShardCoordinator,
) {
let regions = regions.iter().map(|r| region_info(*r)).collect();
let allocations = coordinator_allocations(coordinator);
self.set_snapshot(ShardingSnapshot { regions, allocations });
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn set_snapshot_and_event() {
let bus = TelemetryBus::new(8);
let mut rx = bus.subscribe();
let probe = ShardingProbe::new(bus);
probe.set_snapshot(ShardingSnapshot {
regions: vec![ShardRegionInfo {
region_id: "r1".into(),
shard_count: 3,
shards: vec!["s1".into()],
}],
allocations: vec![],
});
assert_eq!(probe.snapshot().regions[0].shard_count, 3);
probe.record_shard_event("r1", "s1", "started");
let e = rx.recv().await.unwrap();
assert_eq!(e.topic(), "sharding");
}
}