use std::collections::BTreeSet;
use internet2::addr::NodeId;
use microservices::rpc;
use storm::p2p::{self, AppMsg};
use storm::{ContainerFullId, ContainerId, ContainerInfo, Mesg, MesgId, StormApp, Topic};
use storm_rpc::AddressedMsg;
use strict_encoding::StrictEncode;
#[derive(Clone, Debug, Display, From, Api)]
#[api(encoding = "strict")]
#[non_exhaustive]
pub(crate) enum BusMsg {
#[api(type = 5)]
#[display(inner)]
#[from]
Ext(ExtMsg),
}
impl rpc::Request for BusMsg {}
#[derive(Clone, Debug, Display, Api, From)]
#[derive(NetworkEncode, NetworkDecode)]
#[api(encoding = "strict")]
#[non_exhaustive]
pub enum ExtMsg {
#[api(type = 0x0100)]
#[display("register_app({0})")]
RegisterApp(StormApp),
#[api(type = 0x0102)]
#[display("list_topics()")]
ListTopics(AddressedMsg<()>),
#[api(type = 0x0103)]
#[display("topics(...)")]
Topics(AddressedMsg<BTreeSet<MesgId>>),
#[api(type = 0x0006)]
#[display("propose_topic(...)")]
ProposeTopic(AddressedMsg<Topic>),
#[api(type = 0x0008)]
#[display("post({0})")]
Post(AddressedMsg<Mesg>),
#[api(type = 0x000a)]
#[display("post_retrieve({0})")]
Read(AddressedMsg<MesgId>),
#[api(type = 0x0011)]
#[display("container_announcement({0})")]
ContainerAnnouncement(AddressedMsg<ContainerInfo>),
#[api(type = 0x0013)]
#[display("container_retrieved({0})")]
ContainerRetrieved(ContainerId),
#[api(type = 0x0012)]
#[display("retrieve_container({0})")]
RetrieveContainer(AddressedMsg<ContainerFullId>),
#[api(type = 0x0014)]
#[display("send_container({0})")]
SendContainer(AddressedMsg<ContainerFullId>),
#[api(type = 0x001c)]
#[display("decline({0})")]
Decline(AddressedMsg<MesgId>),
#[api(type = 0x001e)]
#[display("accept({0})")]
Accept(AddressedMsg<MesgId>),
}
pub trait StormExtMsg {
fn storm_ext_msg(self, remote_id: NodeId) -> Result<(StormApp, ExtMsg), Self>
where Self: Sized;
}
impl StormExtMsg for p2p::Messages {
fn storm_ext_msg(self, remote_id: NodeId) -> Result<(StormApp, ExtMsg), Self> {
Ok(match self {
p2p::Messages::ListTopics(AppMsg { data, app }) => {
(app, ExtMsg::ListTopics(AddressedMsg { remote_id, data }))
}
p2p::Messages::AppTopics(AppMsg { data, app }) => {
(app, ExtMsg::Topics(AddressedMsg { remote_id, data }))
}
p2p::Messages::ProposeTopic(AppMsg { data, app }) => {
(app, ExtMsg::ProposeTopic(AddressedMsg { remote_id, data }))
}
p2p::Messages::Post(AppMsg { data, app }) => {
(app, ExtMsg::Post(AddressedMsg { remote_id, data }))
}
p2p::Messages::Read(AppMsg { data, app }) => {
(app, ExtMsg::Read(AddressedMsg { remote_id, data }))
}
p2p::Messages::AnnounceContainer(AppMsg { data, app }) => {
(app, ExtMsg::ContainerAnnouncement(AddressedMsg { remote_id, data }))
}
p2p::Messages::PullContainer(AppMsg { data, app }) => {
(app, ExtMsg::RetrieveContainer(AddressedMsg { remote_id, data }))
}
p2p::Messages::Decline(AppMsg { data, app }) => {
(app, ExtMsg::Decline(AddressedMsg { remote_id, data }))
}
p2p::Messages::Accept(AppMsg { data, app }) => {
(app, ExtMsg::Accept(AddressedMsg { remote_id, data }))
}
other => return Err(other),
})
}
}
impl ExtMsg {
pub fn remote_id(&self) -> NodeId {
match self {
ExtMsg::RegisterApp(_) => {
unreachable!("ExtMsg::remote_id must not be called on ExtMsg::RegisterApp")
}
ExtMsg::ContainerRetrieved(_) => {
unreachable!("ExtMsg::remote_id must not be called on ExtMsg::ContainerRetrieved")
}
ExtMsg::ListTopics(AddressedMsg { remote_id, .. })
| ExtMsg::Topics(AddressedMsg { remote_id, .. })
| ExtMsg::ProposeTopic(AddressedMsg { remote_id, .. })
| ExtMsg::Post(AddressedMsg { remote_id, .. })
| ExtMsg::Read(AddressedMsg { remote_id, .. })
| ExtMsg::ContainerAnnouncement(AddressedMsg { remote_id, .. })
| ExtMsg::RetrieveContainer(AddressedMsg { remote_id, .. })
| ExtMsg::SendContainer(AddressedMsg { remote_id, .. })
| ExtMsg::Decline(AddressedMsg { remote_id, .. })
| ExtMsg::Accept(AddressedMsg { remote_id, .. }) => *remote_id,
}
}
pub fn p2p_message(self, app: StormApp) -> p2p::Messages {
match self {
ExtMsg::RegisterApp(_) => {
unreachable!("ExtMsg::remote_id must not be called on ExtMsg::RegisterApp")
}
ExtMsg::ListTopics(AddressedMsg { data, .. }) => {
p2p::Messages::ListTopics(AppMsg { app, data })
}
ExtMsg::Topics(AddressedMsg { data, .. }) => {
p2p::Messages::AppTopics(AppMsg { app, data })
}
ExtMsg::ProposeTopic(AddressedMsg { data, .. }) => {
p2p::Messages::ProposeTopic(AppMsg { app, data })
}
ExtMsg::Post(AddressedMsg { data, .. }) => p2p::Messages::Post(AppMsg { app, data }),
ExtMsg::Read(AddressedMsg { data, .. }) => p2p::Messages::Read(AppMsg { app, data }),
ExtMsg::Decline(AddressedMsg { data, .. }) => {
p2p::Messages::Decline(AppMsg { app, data })
}
ExtMsg::Accept(AddressedMsg { data, .. }) => {
p2p::Messages::Accept(AppMsg { app, data })
}
ExtMsg::ContainerAnnouncement(AddressedMsg { data, .. }) => {
p2p::Messages::AnnounceContainer(AppMsg { app, data })
}
ExtMsg::SendContainer(_)
| ExtMsg::RetrieveContainer(_)
| ExtMsg::ContainerRetrieved(_) => {
unreachable!("the task is handled by a dedicated daemon")
}
}
}
pub fn to_payload(&self) -> Vec<u8> {
match self {
ExtMsg::RegisterApp(_) => {
unreachable!("ExtMsg::to_payload must not be called on ExtMsg::RegisterApp")
}
ExtMsg::ListTopics(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::Topics(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::ProposeTopic(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::Post(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::Read(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::Decline(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::Accept(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::ContainerAnnouncement(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::SendContainer(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::RetrieveContainer(AddressedMsg { data, .. }) => data.strict_serialize(),
ExtMsg::ContainerRetrieved(container_id) => container_id.strict_serialize(),
}
.expect("extension-generated message can't be serialized as a bifrost message payload")
}
}