coerce/remote/api/sharding/
cluster.rs

1use crate::actor::context::ActorContext;
2use crate::actor::message::{Handler, Message};
3use crate::actor::{ActorRef, LocalActorRef};
4use crate::remote::api::sharding::ShardingApi;
5use crate::remote::system::RemoteActorSystem;
6use crate::remote::RemoteActorRef;
7use crate::sharding::coordinator::stats::{GetShardingStats, NodeStats};
8use crate::sharding::host::stats::{GetStats, HostStats, RemoteShard};
9use crate::sharding::host::{shard_actor_id, GetCoordinator};
10use crate::sharding::shard::stats::GetShardStats;
11use crate::sharding::shard::stats::ShardStats as ShardActorStats;
12use crate::sharding::shard::Shard;
13use axum::extract::Path;
14use axum::response::IntoResponse;
15use axum::Json;
16use futures::future::join_all;
17use tokio::sync::oneshot;
18
19pub struct GetClusterStats {
20    entity_type: String,
21}
22
23type ClusterStatsReceiver = oneshot::Receiver<Option<ShardingClusterStats>>;
24
25impl Message for GetClusterStats {
26    type Result = Option<ClusterStatsReceiver>;
27}
28
29pub struct GetHostStats {
30    entity_type: String,
31}
32
33type HostStatsReceiver = oneshot::Receiver<Option<HostStats>>;
34
35impl Message for GetHostStats {
36    type Result = Option<HostStatsReceiver>;
37}
38
39#[derive(Serialize, ToSchema)]
40pub enum ShardHostStatus {
41    Unknown,
42    Starting,
43    Ready,
44    Unavailable,
45}
46
47impl ShardHostStatus {
48    pub fn is_available(&self) -> bool {
49        matches!(&self, Self::Ready)
50    }
51}
52
53#[derive(Serialize, ToSchema)]
54pub struct ShardingNode {
55    pub node_id: u64,
56    pub shard_count: u64,
57    pub status: ShardHostStatus,
58}
59
60#[derive(Serialize, ToSchema)]
61pub struct Entity {
62    actor_id: String,
63}
64
65#[derive(Serialize, ToSchema)]
66pub struct ShardStats {
67    shard_id: u32,
68    node_id: u64,
69    entity_count: u32,
70    entities: Vec<Entity>,
71}
72
73#[derive(Serialize, ToSchema)]
74pub struct ShardingClusterStats {
75    pub entity_type: String,
76    pub total_shards: u64,
77    pub total_nodes: u32,
78    pub available_nodes: u32,
79    pub total_entities: u32,
80    pub nodes: Vec<ShardingNode>,
81    pub shards: Vec<ShardStats>,
82}
83
84#[async_trait]
85impl Handler<GetHostStats> for ShardingApi {
86    async fn handle(
87        &mut self,
88        message: GetHostStats,
89        _ctx: &mut ActorContext,
90    ) -> Option<HostStatsReceiver> {
91        let shard_host = self.shard_hosts.get(&message.entity_type);
92        if shard_host.is_none() {
93            return None;
94        }
95
96        let shard_host = shard_host.unwrap().clone();
97
98        let (tx, rx) = oneshot::channel();
99        tokio::spawn(async move {
100            let receiver = shard_host.send(GetStats).await;
101            if let Ok(receiver) = receiver {
102                let result = receiver.await;
103                if let Ok(stats) = result {
104                    let _ = tx.send(Some(stats));
105                } else {
106                    let _ = tx.send(None);
107                }
108            } else {
109                let _ = tx.send(None);
110            }
111        });
112
113        Some(rx)
114    }
115}
116
117#[async_trait]
118impl Handler<GetClusterStats> for ShardingApi {
119    async fn handle(
120        &mut self,
121        message: GetClusterStats,
122        ctx: &mut ActorContext,
123    ) -> Option<ClusterStatsReceiver> {
124        let shard_host = self.shard_hosts.get(&message.entity_type);
125        if shard_host.is_none() {
126            return None;
127        }
128
129        let remote = ctx.system().remote_owned();
130        let shard_host = shard_host.unwrap().clone();
131        let (tx, rx) = oneshot::channel();
132
133        tokio::spawn(async move {
134            let remote = remote;
135            let shard_host = shard_host;
136            let coordinator = shard_host.send(GetCoordinator).await;
137
138            if let Ok(coordinator) = coordinator {
139                let sharding_stats = coordinator.send(GetShardingStats).await.unwrap();
140                let shards: Vec<ActorRef<Shard>> =
141                    get_shards(&message.entity_type, &remote, sharding_stats.shards).await;
142                let nodes: Vec<ShardingNode> =
143                    sharding_stats.nodes.into_iter().map(|s| s.into()).collect();
144                let shards: Vec<ShardStats> =
145                    join_all(shards.iter().map(|f| f.send(GetShardStats)))
146                        .await
147                        .into_iter()
148                        .map(|s| {
149                            s /*todo: this unwrap may not be safe..*/
150                                .unwrap()
151                                .into()
152                        })
153                        .collect();
154
155                let _ = tx.send(Some(ShardingClusterStats {
156                    entity_type: sharding_stats.entity_type,
157                    total_shards: sharding_stats.total_shards,
158                    total_nodes: nodes.len() as u32,
159                    available_nodes: nodes.iter().filter(|n| n.status.is_available()).count()
160                        as u32,
161                    total_entities: shards.iter().map(|s| s.entity_count).sum(),
162                    nodes,
163                    shards,
164                }));
165            } else {
166                let _ = tx.send(None);
167            }
168        });
169
170        Some(rx)
171    }
172}
173
174async fn get_shards(
175    shard_entity: &String,
176    remote: &RemoteActorSystem,
177    shards: Vec<RemoteShard>,
178) -> Vec<ActorRef<Shard>> {
179    let mut actor_refs = vec![];
180    for shard in shards {
181        let actor_id = shard_actor_id(shard_entity, shard.shard_id);
182        let shard_actor_ref = if shard.node_id == remote.node_id() {
183            // TODO: Can we grab the refs directly from the ShardHost rather than having to look them up one by one?
184            //       OR can we grab a batch of refs from ActorScheduler?
185
186            let local_actor = remote.actor_system().get_tracked_actor(actor_id).await;
187            if let Some(actor) = local_actor {
188                actor.into()
189            } else {
190                warn!(
191                    "could not find local shard actor (actor_id={})",
192                    shard_actor_id(&shard_entity, shard.shard_id)
193                );
194                continue;
195            }
196        } else {
197            RemoteActorRef::<Shard>::new(actor_id, shard.node_id, remote.clone()).into()
198        };
199
200        actor_refs.push(shard_actor_ref);
201    }
202
203    actor_refs
204}
205
206#[utoipa::path(
207    get,
208    path = "/sharding/stats/cluster/{entity}",
209    responses(
210        (status = 200, description = "Sharding stats for the the chosen entity type", body = ShardingClusterStats),
211    ),
212    params(
213        ("entity" = String, Path, description = "Sharded entity type name"),
214    )
215)]
216pub async fn get_sharding_stats(
217    sharding_api: LocalActorRef<ShardingApi>,
218    Path(entity_type): Path<String>,
219) -> impl IntoResponse {
220    let cluster_stats: Option<ClusterStatsReceiver> = sharding_api
221        .send(GetClusterStats { entity_type })
222        .await
223        .unwrap();
224    if let Some(cluster_stats_receiver) = cluster_stats {
225        Json(cluster_stats_receiver.await.unwrap())
226    } else {
227        Json(None)
228    }
229}
230
231pub async fn get_shard_host_stats(
232    sharding_api: LocalActorRef<ShardingApi>,
233    Path(entity_type): Path<String>,
234) -> impl IntoResponse {
235    let host_stats: Option<HostStatsReceiver> = sharding_api
236        .send(GetHostStats { entity_type })
237        .await
238        .unwrap();
239    if let Some(host_stats) = host_stats {
240        Json(host_stats.await.unwrap())
241    } else {
242        Json(None)
243    }
244}
245
246impl From<ShardActorStats> for ShardStats {
247    fn from(s: ShardActorStats) -> Self {
248        Self {
249            shard_id: s.shard_id,
250            entity_count: s.entities.len() as u32,
251            node_id: s.node_id,
252            entities: s
253                .entities
254                .into_iter()
255                .map(|e| Entity {
256                    actor_id: e.to_string(),
257                })
258                .collect(),
259        }
260    }
261}
262
263impl From<crate::sharding::coordinator::ShardHostStatus> for ShardHostStatus {
264    fn from(value: crate::sharding::coordinator::ShardHostStatus) -> Self {
265        match value {
266            crate::sharding::coordinator::ShardHostStatus::Unknown => Self::Unknown,
267            crate::sharding::coordinator::ShardHostStatus::Starting => Self::Starting,
268            crate::sharding::coordinator::ShardHostStatus::Ready => Self::Ready,
269            crate::sharding::coordinator::ShardHostStatus::Unavailable => Self::Unavailable,
270        }
271    }
272}
273
274impl From<NodeStats> for ShardingNode {
275    fn from(node: NodeStats) -> Self {
276        Self {
277            node_id: node.node_id,
278            shard_count: node.shard_count,
279            status: node.status.into(),
280        }
281    }
282}