1use std::sync::Arc;
2
3use libp2p_identity::PeerId;
4use regex::Regex;
5#[cfg(feature = "serde")]
6use serde::{Deserialize, Serialize};
7
8use crate::{
9 error::{Error, Result},
10 grpc,
11};
12
13#[derive(Debug, Clone)]
14#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
15pub enum TopicQuery {
16 String(Arc<str>),
17 #[cfg_attr(feature = "serde", serde(with = "string"))]
18 Regex(Regex),
19}
20
21impl TopicQuery {
22 pub fn matches(&self, topic: impl AsRef<str>) -> bool {
23 match self {
24 TopicQuery::String(query) => query.as_ref() == topic.as_ref(),
25 TopicQuery::Regex(regex) => regex.is_match(topic.as_ref()),
26 }
27 }
28}
29
30impl From<TopicQuery> for grpc::TopicQuery {
31 fn from(query: TopicQuery) -> Self {
32 let query = match query {
33 TopicQuery::String(topic) => grpc::topic_query::Query::Topic(grpc::Topic {
34 topic: topic.to_string(),
35 }),
36 TopicQuery::Regex(regex) => grpc::topic_query::Query::Regex(regex.to_string()),
37 };
38
39 Self { query: Some(query) }
40 }
41}
42
43impl TryFrom<grpc::TopicQuery> for TopicQuery {
44 type Error = Error;
45
46 fn try_from(query: grpc::TopicQuery) -> Result<Self> {
47 Ok(match query.query.ok_or(Error::MissingQuery)? {
48 grpc::topic_query::Query::Topic(topic) => Self::String(topic.topic.into()),
49 grpc::topic_query::Query::Regex(regex) => Self::Regex(Regex::new(®ex)?),
50 })
51 }
52}
53
54impl From<String> for TopicQuery {
55 fn from(topic: String) -> Self {
56 Self::String(topic.into())
57 }
58}
59
60impl From<&str> for TopicQuery {
61 fn from(topic: &str) -> Self {
62 Self::String(topic.into())
63 }
64}
65
66impl From<Arc<str>> for TopicQuery {
67 fn from(topic: Arc<str>) -> Self {
68 Self::String(topic)
69 }
70}
71
72impl From<Regex> for TopicQuery {
73 fn from(regex: Regex) -> Self {
74 Self::Regex(regex)
75 }
76}
77
78#[derive(Debug, Clone)]
79#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
80pub struct Request {
81 pub data: Vec<u8>,
82 pub topic: Option<String>,
83}
84
85impl From<Request> for grpc::Message {
86 fn from(request: Request) -> Self {
87 Self {
88 data: grpc::Data { data: request.data },
89 topic: grpc::OptionalTopic {
90 topic: request.topic.map(|topic| grpc::Topic { topic }),
91 },
92 }
93 }
94}
95
96impl From<grpc::Message> for Request {
97 fn from(message: grpc::Message) -> Self {
98 Self {
99 data: message.data.data,
100 topic: message.topic.topic.map(|topic| topic.topic),
101 }
102 }
103}
104
105#[derive(Debug, Clone)]
106#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
107pub struct InboundRequest {
108 pub id: u64,
109 pub peer_id: PeerId,
110 pub req: Request,
111}
112
113impl From<InboundRequest> for grpc::RecvRequest {
114 fn from(request: InboundRequest) -> Self {
115 Self {
116 peer: request.peer_id.into(),
117 msg: request.req.into(),
118 seq: request.id,
119 }
120 }
121}
122
123impl TryFrom<grpc::RecvRequest> for InboundRequest {
124 type Error = Error;
125
126 fn try_from(request: grpc::RecvRequest) -> Result<Self> {
127 Ok(Self {
128 id: request.seq,
129 peer_id: request.peer.try_into()?,
130 req: request.msg.into(),
131 })
132 }
133}
134
135#[derive(Debug, Clone, thiserror::Error)]
136#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
137pub enum ResponseError {
138 #[error("Request timed out")]
139 Timeout,
140 #[error(
141 "Peer is not subscribed to {}",
142 .0.as_deref().map_or("the empty topic".to_string(), |topic| format!("topic '{topic:?}'"))
143 )]
144 TopicNotSubscribed(Option<String>),
145 #[error("Script error: {0}")]
146 Script(String),
147}
148
149impl From<String> for ResponseError {
150 fn from(e: String) -> Self {
151 ResponseError::Script(e)
152 }
153}
154
155#[derive(Debug, Clone)]
156#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
157pub enum Response {
158 Data(Vec<u8>),
159 Error(ResponseError),
160}
161
162impl From<Response> for grpc::Response {
163 fn from(response: Response) -> Self {
164 Self {
165 response: Some(match response {
166 Response::Data(data) => grpc::response::Response::Data(grpc::Data { data }),
167 Response::Error(e) => grpc::response::Response::Error(e.to_string()),
168 }),
169 }
170 }
171}
172
173impl TryFrom<Response> for Vec<u8> {
174 type Error = ResponseError;
175
176 fn try_from(response: Response) -> Result<Self, Self::Error> {
177 match response {
178 Response::Data(data) => Ok(data),
179 Response::Error(e) => Err(e),
180 }
181 }
182}
183
184impl TryFrom<grpc::Response> for Response {
185 type Error = Error;
186
187 fn try_from(response: grpc::Response) -> Result<Self> {
188 Ok(match response.response.ok_or(Error::MissingResponse)? {
189 grpc::response::Response::Data(data) => Self::Data(data.data),
190 grpc::response::Response::Error(e) => Self::Error(e.into()),
191 })
192 }
193}
194
195#[cfg(feature = "serde")]
196mod string {
197 use std::{fmt::Display, str::FromStr};
198
199 use serde::{de, Deserialize, Deserializer, Serializer};
200
201 pub fn serialize<T, S>(value: &T, serializer: S) -> Result<S::Ok, S::Error>
202 where
203 T: Display,
204 S: Serializer,
205 {
206 serializer.collect_str(value)
207 }
208
209 pub fn deserialize<'de, T, D>(deserializer: D) -> Result<T, D::Error>
210 where
211 T: FromStr,
212 T::Err: Display,
213 D: Deserializer<'de>,
214 {
215 String::deserialize(deserializer)?
216 .parse()
217 .map_err(de::Error::custom)
218 }
219}