coerce/remote/api/sharding/
cluster.rs1use crate::actor::context::ActorContext;
2use crate::actor::message::{Handler, Message};
3use crate::actor::{ActorRef, LocalActorRef};
4use crate::remote::api::sharding::ShardingApi;
5use crate::remote::system::RemoteActorSystem;
6use crate::remote::RemoteActorRef;
7use crate::sharding::coordinator::stats::{GetShardingStats, NodeStats};
8use crate::sharding::host::stats::{GetStats, HostStats, RemoteShard};
9use crate::sharding::host::{shard_actor_id, GetCoordinator};
10use crate::sharding::shard::stats::GetShardStats;
11use crate::sharding::shard::stats::ShardStats as ShardActorStats;
12use crate::sharding::shard::Shard;
13use axum::extract::Path;
14use axum::response::IntoResponse;
15use axum::Json;
16use futures::future::join_all;
17use tokio::sync::oneshot;
18
19pub struct GetClusterStats {
20 entity_type: String,
21}
22
23type ClusterStatsReceiver = oneshot::Receiver<Option<ShardingClusterStats>>;
24
25impl Message for GetClusterStats {
26 type Result = Option<ClusterStatsReceiver>;
27}
28
29pub struct GetHostStats {
30 entity_type: String,
31}
32
33type HostStatsReceiver = oneshot::Receiver<Option<HostStats>>;
34
35impl Message for GetHostStats {
36 type Result = Option<HostStatsReceiver>;
37}
38
39#[derive(Serialize, ToSchema)]
40pub enum ShardHostStatus {
41 Unknown,
42 Starting,
43 Ready,
44 Unavailable,
45}
46
47impl ShardHostStatus {
48 pub fn is_available(&self) -> bool {
49 matches!(&self, Self::Ready)
50 }
51}
52
53#[derive(Serialize, ToSchema)]
54pub struct ShardingNode {
55 pub node_id: u64,
56 pub shard_count: u64,
57 pub status: ShardHostStatus,
58}
59
60#[derive(Serialize, ToSchema)]
61pub struct Entity {
62 actor_id: String,
63}
64
65#[derive(Serialize, ToSchema)]
66pub struct ShardStats {
67 shard_id: u32,
68 node_id: u64,
69 entity_count: u32,
70 entities: Vec<Entity>,
71}
72
73#[derive(Serialize, ToSchema)]
74pub struct ShardingClusterStats {
75 pub entity_type: String,
76 pub total_shards: u64,
77 pub total_nodes: u32,
78 pub available_nodes: u32,
79 pub total_entities: u32,
80 pub nodes: Vec<ShardingNode>,
81 pub shards: Vec<ShardStats>,
82}
83
84#[async_trait]
85impl Handler<GetHostStats> for ShardingApi {
86 async fn handle(
87 &mut self,
88 message: GetHostStats,
89 _ctx: &mut ActorContext,
90 ) -> Option<HostStatsReceiver> {
91 let shard_host = self.shard_hosts.get(&message.entity_type);
92 if shard_host.is_none() {
93 return None;
94 }
95
96 let shard_host = shard_host.unwrap().clone();
97
98 let (tx, rx) = oneshot::channel();
99 tokio::spawn(async move {
100 let receiver = shard_host.send(GetStats).await;
101 if let Ok(receiver) = receiver {
102 let result = receiver.await;
103 if let Ok(stats) = result {
104 let _ = tx.send(Some(stats));
105 } else {
106 let _ = tx.send(None);
107 }
108 } else {
109 let _ = tx.send(None);
110 }
111 });
112
113 Some(rx)
114 }
115}
116
117#[async_trait]
118impl Handler<GetClusterStats> for ShardingApi {
119 async fn handle(
120 &mut self,
121 message: GetClusterStats,
122 ctx: &mut ActorContext,
123 ) -> Option<ClusterStatsReceiver> {
124 let shard_host = self.shard_hosts.get(&message.entity_type);
125 if shard_host.is_none() {
126 return None;
127 }
128
129 let remote = ctx.system().remote_owned();
130 let shard_host = shard_host.unwrap().clone();
131 let (tx, rx) = oneshot::channel();
132
133 tokio::spawn(async move {
134 let remote = remote;
135 let shard_host = shard_host;
136 let coordinator = shard_host.send(GetCoordinator).await;
137
138 if let Ok(coordinator) = coordinator {
139 let sharding_stats = coordinator.send(GetShardingStats).await.unwrap();
140 let shards: Vec<ActorRef<Shard>> =
141 get_shards(&message.entity_type, &remote, sharding_stats.shards).await;
142 let nodes: Vec<ShardingNode> =
143 sharding_stats.nodes.into_iter().map(|s| s.into()).collect();
144 let shards: Vec<ShardStats> =
145 join_all(shards.iter().map(|f| f.send(GetShardStats)))
146 .await
147 .into_iter()
148 .map(|s| {
149 s .unwrap()
151 .into()
152 })
153 .collect();
154
155 let _ = tx.send(Some(ShardingClusterStats {
156 entity_type: sharding_stats.entity_type,
157 total_shards: sharding_stats.total_shards,
158 total_nodes: nodes.len() as u32,
159 available_nodes: nodes.iter().filter(|n| n.status.is_available()).count()
160 as u32,
161 total_entities: shards.iter().map(|s| s.entity_count).sum(),
162 nodes,
163 shards,
164 }));
165 } else {
166 let _ = tx.send(None);
167 }
168 });
169
170 Some(rx)
171 }
172}
173
174async fn get_shards(
175 shard_entity: &String,
176 remote: &RemoteActorSystem,
177 shards: Vec<RemoteShard>,
178) -> Vec<ActorRef<Shard>> {
179 let mut actor_refs = vec![];
180 for shard in shards {
181 let actor_id = shard_actor_id(shard_entity, shard.shard_id);
182 let shard_actor_ref = if shard.node_id == remote.node_id() {
183 let local_actor = remote.actor_system().get_tracked_actor(actor_id).await;
187 if let Some(actor) = local_actor {
188 actor.into()
189 } else {
190 warn!(
191 "could not find local shard actor (actor_id={})",
192 shard_actor_id(&shard_entity, shard.shard_id)
193 );
194 continue;
195 }
196 } else {
197 RemoteActorRef::<Shard>::new(actor_id, shard.node_id, remote.clone()).into()
198 };
199
200 actor_refs.push(shard_actor_ref);
201 }
202
203 actor_refs
204}
205
206#[utoipa::path(
207 get,
208 path = "/sharding/stats/cluster/{entity}",
209 responses(
210 (status = 200, description = "Sharding stats for the the chosen entity type", body = ShardingClusterStats),
211 ),
212 params(
213 ("entity" = String, Path, description = "Sharded entity type name"),
214 )
215)]
216pub async fn get_sharding_stats(
217 sharding_api: LocalActorRef<ShardingApi>,
218 Path(entity_type): Path<String>,
219) -> impl IntoResponse {
220 let cluster_stats: Option<ClusterStatsReceiver> = sharding_api
221 .send(GetClusterStats { entity_type })
222 .await
223 .unwrap();
224 if let Some(cluster_stats_receiver) = cluster_stats {
225 Json(cluster_stats_receiver.await.unwrap())
226 } else {
227 Json(None)
228 }
229}
230
231pub async fn get_shard_host_stats(
232 sharding_api: LocalActorRef<ShardingApi>,
233 Path(entity_type): Path<String>,
234) -> impl IntoResponse {
235 let host_stats: Option<HostStatsReceiver> = sharding_api
236 .send(GetHostStats { entity_type })
237 .await
238 .unwrap();
239 if let Some(host_stats) = host_stats {
240 Json(host_stats.await.unwrap())
241 } else {
242 Json(None)
243 }
244}
245
246impl From<ShardActorStats> for ShardStats {
247 fn from(s: ShardActorStats) -> Self {
248 Self {
249 shard_id: s.shard_id,
250 entity_count: s.entities.len() as u32,
251 node_id: s.node_id,
252 entities: s
253 .entities
254 .into_iter()
255 .map(|e| Entity {
256 actor_id: e.to_string(),
257 })
258 .collect(),
259 }
260 }
261}
262
263impl From<crate::sharding::coordinator::ShardHostStatus> for ShardHostStatus {
264 fn from(value: crate::sharding::coordinator::ShardHostStatus) -> Self {
265 match value {
266 crate::sharding::coordinator::ShardHostStatus::Unknown => Self::Unknown,
267 crate::sharding::coordinator::ShardHostStatus::Starting => Self::Starting,
268 crate::sharding::coordinator::ShardHostStatus::Ready => Self::Ready,
269 crate::sharding::coordinator::ShardHostStatus::Unavailable => Self::Unavailable,
270 }
271 }
272}
273
274impl From<NodeStats> for ShardingNode {
275 fn from(node: NodeStats) -> Self {
276 Self {
277 node_id: node.node_id,
278 shard_count: node.shard_count,
279 status: node.status.into(),
280 }
281 }
282}