Skip to main content

atomr_telemetry/
sharding.rs

1//! Sharding probe — snapshots `ShardRegion` / `ShardCoordinator` state
2//! when the `sharding` feature is enabled.
3
4use parking_lot::RwLock;
5
6use crate::bus::{TelemetryBus, TelemetryEvent};
7#[cfg(any(feature = "sharding", test))]
8use crate::dto::ShardRegionInfo;
9use crate::dto::{ShardingEvent, ShardingSnapshot};
10
11pub struct ShardingProbe {
12    bus: TelemetryBus,
13    snapshot: RwLock<ShardingSnapshot>,
14}
15
16impl ShardingProbe {
17    pub fn new(bus: TelemetryBus) -> Self {
18        Self { bus, snapshot: RwLock::new(ShardingSnapshot::default()) }
19    }
20
21    pub fn set_snapshot(&self, snap: ShardingSnapshot) {
22        *self.snapshot.write() = snap;
23    }
24
25    pub fn snapshot(&self) -> ShardingSnapshot {
26        self.snapshot.read().clone()
27    }
28
29    pub fn record_shard_event(&self, region_id: &str, shard_id: &str, event: &str) {
30        self.bus.publish(TelemetryEvent::ShardingChanged(ShardingEvent {
31            region_id: region_id.to_string(),
32            shard_id: shard_id.to_string(),
33            event: event.to_string(),
34        }));
35    }
36}
37
38/// Build a [`ShardRegionInfo`] from a live `atomr-cluster-sharding`
39/// region. Feature-gated.
40#[cfg(feature = "sharding")]
41pub fn region_info<E: atomr_cluster_sharding::MessageExtractor>(
42    region: &atomr_cluster_sharding::ShardRegion<E>,
43) -> ShardRegionInfo {
44    ShardRegionInfo {
45        region_id: region.region_id().to_string(),
46        shard_count: region.shard_count(),
47        shards: region.shard_ids(),
48    }
49}
50
51/// Snapshot of the coordinator's shard → region allocation table.
52#[cfg(feature = "sharding")]
53pub fn coordinator_allocations(coord: &atomr_cluster_sharding::ShardCoordinator) -> Vec<(String, String)> {
54    coord.allocations()
55}
56
57impl ShardingProbe {
58    /// Convenience: refresh the probe snapshot from a list of live
59    /// regions and a coordinator.
60    #[cfg(feature = "sharding")]
61    pub fn refresh_from<E: atomr_cluster_sharding::MessageExtractor>(
62        &self,
63        regions: &[&atomr_cluster_sharding::ShardRegion<E>],
64        coordinator: &atomr_cluster_sharding::ShardCoordinator,
65    ) {
66        let regions = regions.iter().map(|r| region_info(*r)).collect();
67        let allocations = coordinator_allocations(coordinator);
68        self.set_snapshot(ShardingSnapshot { regions, allocations });
69    }
70}
71
72#[cfg(test)]
73mod tests {
74    use super::*;
75
76    #[tokio::test]
77    async fn set_snapshot_and_event() {
78        let bus = TelemetryBus::new(8);
79        let mut rx = bus.subscribe();
80        let probe = ShardingProbe::new(bus);
81        probe.set_snapshot(ShardingSnapshot {
82            regions: vec![ShardRegionInfo {
83                region_id: "r1".into(),
84                shard_count: 3,
85                shards: vec!["s1".into()],
86            }],
87            allocations: vec![],
88        });
89        assert_eq!(probe.snapshot().regions[0].shard_count, 3);
90        probe.record_shard_event("r1", "s1", "started");
91        let e = rx.recv().await.unwrap();
92        assert_eq!(e.topic(), "sharding");
93    }
94}