atomr_telemetry/
sharding.rs1use parking_lot::RwLock;
5
6use crate::bus::{TelemetryBus, TelemetryEvent};
7#[cfg(any(feature = "sharding", test))]
8use crate::dto::ShardRegionInfo;
9use crate::dto::{ShardingEvent, ShardingSnapshot};
10
11pub struct ShardingProbe {
12 bus: TelemetryBus,
13 snapshot: RwLock<ShardingSnapshot>,
14}
15
16impl ShardingProbe {
17 pub fn new(bus: TelemetryBus) -> Self {
18 Self { bus, snapshot: RwLock::new(ShardingSnapshot::default()) }
19 }
20
21 pub fn set_snapshot(&self, snap: ShardingSnapshot) {
22 *self.snapshot.write() = snap;
23 }
24
25 pub fn snapshot(&self) -> ShardingSnapshot {
26 self.snapshot.read().clone()
27 }
28
29 pub fn record_shard_event(&self, region_id: &str, shard_id: &str, event: &str) {
30 self.bus.publish(TelemetryEvent::ShardingChanged(ShardingEvent {
31 region_id: region_id.to_string(),
32 shard_id: shard_id.to_string(),
33 event: event.to_string(),
34 }));
35 }
36}
37
38#[cfg(feature = "sharding")]
41pub fn region_info<E: atomr_cluster_sharding::MessageExtractor>(
42 region: &atomr_cluster_sharding::ShardRegion<E>,
43) -> ShardRegionInfo {
44 ShardRegionInfo {
45 region_id: region.region_id().to_string(),
46 shard_count: region.shard_count(),
47 shards: region.shard_ids(),
48 }
49}
50
51#[cfg(feature = "sharding")]
53pub fn coordinator_allocations(coord: &atomr_cluster_sharding::ShardCoordinator) -> Vec<(String, String)> {
54 coord.allocations()
55}
56
57impl ShardingProbe {
58 #[cfg(feature = "sharding")]
61 pub fn refresh_from<E: atomr_cluster_sharding::MessageExtractor>(
62 &self,
63 regions: &[&atomr_cluster_sharding::ShardRegion<E>],
64 coordinator: &atomr_cluster_sharding::ShardCoordinator,
65 ) {
66 let regions = regions.iter().map(|r| region_info(*r)).collect();
67 let allocations = coordinator_allocations(coordinator);
68 self.set_snapshot(ShardingSnapshot { regions, allocations });
69 }
70}
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75
76 #[tokio::test]
77 async fn set_snapshot_and_event() {
78 let bus = TelemetryBus::new(8);
79 let mut rx = bus.subscribe();
80 let probe = ShardingProbe::new(bus);
81 probe.set_snapshot(ShardingSnapshot {
82 regions: vec![ShardRegionInfo {
83 region_id: "r1".into(),
84 shard_count: 3,
85 shards: vec!["s1".into()],
86 }],
87 allocations: vec![],
88 });
89 assert_eq!(probe.snapshot().regions[0].shard_count, 3);
90 probe.record_shard_event("r1", "s1", "started");
91 let e = rx.recv().await.unwrap();
92 assert_eq!(e.topic(), "sharding");
93 }
94}