hyveos_core/
req_resp.rs

1use std::sync::Arc;
2
3use libp2p_identity::PeerId;
4use regex::Regex;
5#[cfg(feature = "serde")]
6use serde::{Deserialize, Serialize};
7
8use crate::{
9    error::{Error, Result},
10    grpc,
11};
12
13#[derive(Debug, Clone)]
14#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
15pub enum TopicQuery {
16    String(Arc<str>),
17    #[cfg_attr(feature = "serde", serde(with = "string"))]
18    Regex(Regex),
19}
20
21impl TopicQuery {
22    pub fn matches(&self, topic: impl AsRef<str>) -> bool {
23        match self {
24            TopicQuery::String(query) => query.as_ref() == topic.as_ref(),
25            TopicQuery::Regex(regex) => regex.is_match(topic.as_ref()),
26        }
27    }
28}
29
30impl From<TopicQuery> for grpc::TopicQuery {
31    fn from(query: TopicQuery) -> Self {
32        let query = match query {
33            TopicQuery::String(topic) => grpc::topic_query::Query::Topic(grpc::Topic {
34                topic: topic.to_string(),
35            }),
36            TopicQuery::Regex(regex) => grpc::topic_query::Query::Regex(regex.to_string()),
37        };
38
39        Self { query: Some(query) }
40    }
41}
42
43impl TryFrom<grpc::TopicQuery> for TopicQuery {
44    type Error = Error;
45
46    fn try_from(query: grpc::TopicQuery) -> Result<Self> {
47        Ok(match query.query.ok_or(Error::MissingQuery)? {
48            grpc::topic_query::Query::Topic(topic) => Self::String(topic.topic.into()),
49            grpc::topic_query::Query::Regex(regex) => Self::Regex(Regex::new(&regex)?),
50        })
51    }
52}
53
54impl From<String> for TopicQuery {
55    fn from(topic: String) -> Self {
56        Self::String(topic.into())
57    }
58}
59
60impl From<&str> for TopicQuery {
61    fn from(topic: &str) -> Self {
62        Self::String(topic.into())
63    }
64}
65
66impl From<Arc<str>> for TopicQuery {
67    fn from(topic: Arc<str>) -> Self {
68        Self::String(topic)
69    }
70}
71
72impl From<Regex> for TopicQuery {
73    fn from(regex: Regex) -> Self {
74        Self::Regex(regex)
75    }
76}
77
78#[derive(Debug, Clone)]
79#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
80pub struct Request {
81    pub data: Vec<u8>,
82    pub topic: Option<String>,
83}
84
85impl From<Request> for grpc::Message {
86    fn from(request: Request) -> Self {
87        Self {
88            data: grpc::Data { data: request.data },
89            topic: grpc::OptionalTopic {
90                topic: request.topic.map(|topic| grpc::Topic { topic }),
91            },
92        }
93    }
94}
95
96impl From<grpc::Message> for Request {
97    fn from(message: grpc::Message) -> Self {
98        Self {
99            data: message.data.data,
100            topic: message.topic.topic.map(|topic| topic.topic),
101        }
102    }
103}
104
105#[derive(Debug, Clone)]
106#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
107pub struct InboundRequest {
108    pub id: u64,
109    pub peer_id: PeerId,
110    pub req: Request,
111}
112
113impl From<InboundRequest> for grpc::RecvRequest {
114    fn from(request: InboundRequest) -> Self {
115        Self {
116            peer: request.peer_id.into(),
117            msg: request.req.into(),
118            seq: request.id,
119        }
120    }
121}
122
123impl TryFrom<grpc::RecvRequest> for InboundRequest {
124    type Error = Error;
125
126    fn try_from(request: grpc::RecvRequest) -> Result<Self> {
127        Ok(Self {
128            id: request.seq,
129            peer_id: request.peer.try_into()?,
130            req: request.msg.into(),
131        })
132    }
133}
134
135#[derive(Debug, Clone, thiserror::Error)]
136#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
137pub enum ResponseError {
138    #[error("Request timed out")]
139    Timeout,
140    #[error(
141        "Peer is not subscribed to {}",
142        .0.as_deref().map_or("the empty topic".to_string(), |topic| format!("topic '{topic:?}'"))
143    )]
144    TopicNotSubscribed(Option<String>),
145    #[error("Script error: {0}")]
146    Script(String),
147}
148
149impl From<String> for ResponseError {
150    fn from(e: String) -> Self {
151        ResponseError::Script(e)
152    }
153}
154
155#[derive(Debug, Clone)]
156#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
157pub enum Response {
158    Data(Vec<u8>),
159    Error(ResponseError),
160}
161
162impl From<Response> for grpc::Response {
163    fn from(response: Response) -> Self {
164        Self {
165            response: Some(match response {
166                Response::Data(data) => grpc::response::Response::Data(grpc::Data { data }),
167                Response::Error(e) => grpc::response::Response::Error(e.to_string()),
168            }),
169        }
170    }
171}
172
173impl TryFrom<Response> for Vec<u8> {
174    type Error = ResponseError;
175
176    fn try_from(response: Response) -> Result<Self, Self::Error> {
177        match response {
178            Response::Data(data) => Ok(data),
179            Response::Error(e) => Err(e),
180        }
181    }
182}
183
184impl TryFrom<grpc::Response> for Response {
185    type Error = Error;
186
187    fn try_from(response: grpc::Response) -> Result<Self> {
188        Ok(match response.response.ok_or(Error::MissingResponse)? {
189            grpc::response::Response::Data(data) => Self::Data(data.data),
190            grpc::response::Response::Error(e) => Self::Error(e.into()),
191        })
192    }
193}
194
195#[cfg(feature = "serde")]
196mod string {
197    use std::{fmt::Display, str::FromStr};
198
199    use serde::{de, Deserialize, Deserializer, Serializer};
200
201    pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
202    where
203        T: Display,
204        S: Serializer,
205    {
206        serializer.collect_str(value)
207    }
208
209    pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
210    where
211        T: FromStr,
212        T::Err: Display,
213        D: Deserializer<'de>,
214    {
215        String::deserialize(deserializer)?
216            .parse()
217            .map_err(de::Error::custom)
218    }
219}