coerce 0.8.6

Async actor runtime and distributed systems framework
Documentation
use crate::actor::context::ActorContext;
use crate::actor::message::{Handler, Message, MessageUnwrapErr, MessageWrapErr};
use crate::remote::system::NodeId;
use crate::sharding::coordinator::{ShardCoordinator, ShardHostStatus};
use crate::sharding::host::stats::RemoteShard;
use crate::sharding::proto::sharding as proto;
use protobuf::{EnumOrUnknown, Message as ProtoMessage};

pub struct GetShardingStats;

pub struct NodeStats {
    pub node_id: NodeId,
    pub shard_count: u64,
    pub status: ShardHostStatus,
}

pub struct ShardingStats {
    pub entity_type: String,
    pub total_shards: u64,
    pub shards: Vec<RemoteShard>,
    pub nodes: Vec<NodeStats>,
}

#[async_trait]
impl Handler<GetShardingStats> for ShardCoordinator {
    async fn handle(&mut self, _msg: GetShardingStats, _ctx: &mut ActorContext) -> ShardingStats {
        ShardingStats {
            entity_type: self.shard_entity.clone(),
            total_shards: self.shards.len() as u64,
            shards: self
                .shards
                .iter()
                .map(|s| RemoteShard {
                    shard_id: *s.0,
                    node_id: *s.1,
                })
                .collect(),
            nodes: self
                .hosts
                .iter()
                .map(|n| NodeStats {
                    node_id: n.1.node_id,
                    shard_count: n.1.shards.len() as u64,
                    status: n.1.status,
                })
                .collect(),
        }
    }
}

impl Message for GetShardingStats {
    type Result = ShardingStats;

    fn as_bytes(&self) -> Result<Vec<u8>, MessageWrapErr> {
        proto::GetShardingStats {
            ..Default::default()
        }
        .write_to_bytes()
        .map_err(|_| MessageWrapErr::SerializationErr)
    }

    fn from_bytes(bytes: Vec<u8>) -> Result<Self, MessageUnwrapErr> {
        proto::GetShardingStats::parse_from_bytes(&bytes).map_or_else(
            |_| Err(MessageUnwrapErr::DeserializationErr),
            |_m| Ok(GetShardingStats),
        )
    }

    fn read_remote_result(res: Vec<u8>) -> Result<Self::Result, MessageUnwrapErr> {
        let proto_stats = proto::ShardingStats::parse_from_bytes(&res);
        if let Ok(stats) = proto_stats {
            Ok(ShardingStats {
                entity_type: stats.entity_type,
                total_shards: stats.total_shards,
                shards: stats
                    .shards
                    .into_iter()
                    .map(|s| RemoteShard {
                        shard_id: s.shard_id,
                        node_id: s.node_id,
                    })
                    .collect(),
                nodes: stats
                    .nodes
                    .into_iter()
                    .map(|n| NodeStats {
                        node_id: n.node_id,
                        shard_count: n.shard_count,
                        status: n.status.unwrap().into(),
                    })
                    .collect(),
            })
        } else {
            Err(MessageUnwrapErr::DeserializationErr)
        }
    }

    fn write_remote_result(res: ShardingStats) -> Result<Vec<u8>, MessageWrapErr> {
        proto::ShardingStats {
            entity_type: res.entity_type,
            total_shards: res.total_shards,
            shards: res
                .shards
                .into_iter()
                .map(|s| proto::RemoteShard {
                    shard_id: s.shard_id,
                    node_id: s.node_id,
                    ..Default::default()
                })
                .collect(),
            nodes: res
                .nodes
                .into_iter()
                .map(|n| proto::NodeStats {
                    node_id: n.node_id,
                    shard_count: n.shard_count,
                    status: EnumOrUnknown::new(n.status.into()),
                    ..Default::default()
                })
                .collect(),
            ..Default::default()
        }
        .write_to_bytes()
        .map_or_else(|_| Err(MessageWrapErr::SerializationErr), |b| Ok(b))
    }
}

impl Into<proto::ShardHostStatus> for ShardHostStatus {
    fn into(self) -> proto::ShardHostStatus {
        match self {
            ShardHostStatus::Unknown => proto::ShardHostStatus::UNKNOWN,
            ShardHostStatus::Starting => proto::ShardHostStatus::STARTING,
            ShardHostStatus::Ready => proto::ShardHostStatus::READY,
            ShardHostStatus::Unavailable => proto::ShardHostStatus::UNAVAILABLE,
        }
    }
}

impl From<proto::ShardHostStatus> for ShardHostStatus {
    fn from(status: proto::ShardHostStatus) -> Self {
        match status {
            proto::ShardHostStatus::UNKNOWN => Self::Unknown,
            proto::ShardHostStatus::STARTING => Self::Starting,
            proto::ShardHostStatus::READY => Self::Ready,
            proto::ShardHostStatus::UNAVAILABLE => Self::Unavailable,
        }
    }
}