1use std::borrow::Cow;
2pub mod connection;
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5use typeshare::typeshare;
6
7use crate::{
8 durable::MessageDurableConfig,
9 endpoint::EndpointAddr,
10 interest::{Interest, Subject},
11 message::{Message, MessageAckExpectKind, MessageHeader, MessageId, MessageTargetKind},
12 proposal::{EndpointInterest, SetState},
13 topic::{TopicCode, WaitAckError, WaitAckSuccess},
14 util::MaybeBase64Bytes,
15 NodeId,
16};
17
18#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
19#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
20#[repr(u8)]
21#[typeshare]
22#[serde(tag = "kind", content = "content")]
23pub enum EdgeRequestEnum {
24 SendMessage(EdgeMessage),
25 EndpointOnline(EdgeEndpointOnline),
26 EndpointOffline(EdgeEndpointOffline),
27 EndpointInterest(EndpointInterest),
28 SetState(SetState),
29}
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[typeshare]
32#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
33
34pub struct EdgeEndpointOnline {
35 pub topic_code: TopicCode,
36 pub interests: Vec<Interest>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40#[typeshare]
41#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
42pub struct EdgeEndpointOffline {
43 pub topic_code: TopicCode,
44 pub endpoint: EndpointAddr,
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
49#[typeshare]
50#[serde(tag = "kind", content = "content")]
51pub enum EdgePayload {
52 Push(EdgePush),
53 Response(EdgeResponse),
54 Request(EdgeRequest),
55 Error(EdgeError),
56}
57
58#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
59#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
60#[typeshare]
61#[serde(tag = "kind", content = "content")]
62pub enum EdgePush {
63 Message {
64 endpoints: Vec<EndpointAddr>,
65 message: Message,
66 },
67}
68
69#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
70#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
71#[typeshare]
72pub struct EdgeRequest {
73 pub seq_id: u32,
74 pub request: EdgeRequestEnum,
75}
76
77#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
79#[typeshare]
80pub struct EdgeResponse {
81 pub seq_id: u32,
82 pub result: EdgeResult<EdgeResponseEnum, EdgeError>,
83}
84
85#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
86#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
87#[typeshare]
88#[serde(tag = "kind", content = "content")]
89pub enum EdgeResponseEnum {
90 SendMessage(EdgeResult<WaitAckSuccess, WaitAckError>),
91 EndpointOnline(EndpointAddr),
92 EndpointOffline,
93 EndpointInterest,
94 SetState,
95}
96#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
97#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
98#[typeshare]
99#[serde(tag = "kind", content = "content")]
100pub enum EdgeResult<T, E> {
101 Ok(T),
102 Err(E),
103}
104
105impl<T, E> EdgeResult<T, E> {
106 pub fn from_std(result: Result<T, E>) -> Self {
107 match result {
108 Ok(t) => Self::Ok(t),
109 Err(e) => Self::Err(e),
110 }
111 }
112 pub fn into_std(self) -> Result<T, E> {
113 match self {
114 Self::Ok(t) => Ok(t),
115 Self::Err(e) => Err(e),
116 }
117 }
118}
119
120impl EdgeResponse {
121 pub fn from_result(id: u32, result: Result<EdgeResponseEnum, EdgeError>) -> Self {
122 Self {
123 seq_id: id,
124 result: EdgeResult::from_std(result),
125 }
126 }
127}
128#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
129#[typeshare]
130pub struct EdgeError {
131 pub context: Cow<'static, str>,
132 pub message: Option<Cow<'static, str>>,
133 pub kind: EdgeErrorKind,
134}
135
136impl EdgeError {
137 pub fn new(context: impl Into<Cow<'static, str>>, kind: EdgeErrorKind) -> Self {
138 Self {
139 context: context.into(),
140 message: None,
141 kind,
142 }
143 }
144 pub fn with_message(
145 context: impl Into<Cow<'static, str>>,
146 message: impl Into<Cow<'static, str>>,
147 kind: EdgeErrorKind,
148 ) -> Self {
149 Self {
150 context: context.into(),
151 message: Some(message.into()),
152 kind,
153 }
154 }
155}
156
157#[repr(u8)]
158#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
159#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
160#[typeshare]
161pub enum EdgeErrorKind {
162 Decode = 0x00,
163 TopicNotFound = 0x02,
164 EndpointNotFound = 0x03,
165 Unauthorized = 0x04,
166 Internal = 0xf0,
167}
168
169#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
170#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
171#[typeshare]
172pub struct EdgeMessageHeader {
173 pub ack_kind: MessageAckExpectKind,
174 pub target_kind: MessageTargetKind,
175 pub durability: Option<MessageDurableConfig>,
176 pub subjects: Vec<Subject>,
177 pub topic: TopicCode,
178}
179
180impl EdgeMessageHeader {
181 pub fn into_message_header(self) -> (MessageHeader, TopicCode) {
182 (
183 MessageHeader {
184 message_id: MessageId::new_snowflake(),
185 ack_kind: self.ack_kind,
186 target_kind: self.target_kind,
187 durability: self.durability,
188 subjects: self.subjects.into(),
189 },
190 self.topic,
191 )
192 }
193}
194#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
195#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
196#[typeshare]
197pub struct EdgeMessage {
198 pub header: EdgeMessageHeader,
199 pub payload: MaybeBase64Bytes,
200}
201
202pub struct EdgeMessageBuilder {
203 ack_kind: MessageAckExpectKind,
204 target_kind: MessageTargetKind,
205 durability: Option<MessageDurableConfig>,
206 subjects: Vec<Subject>,
207 topic: TopicCode,
208 payload: Bytes,
209}
210
211impl EdgeMessage {
212 pub fn builder<T, S, P>(topic_code: T, subjects: S, payload: P) -> EdgeMessageBuilder
213 where
214 T: Into<TopicCode>,
215 S: IntoIterator<Item = Subject>,
216 P: Into<Bytes>,
217 {
218 EdgeMessageBuilder {
219 ack_kind: MessageAckExpectKind::Sent,
220 target_kind: MessageTargetKind::Push,
221 durability: None,
222 subjects: subjects.into_iter().collect(),
223 topic: topic_code.into(),
224 payload: payload.into(),
225 }
226 }
227 pub fn into_message(self) -> (Message, TopicCode) {
228 let (header, topic) = self.header.into_message_header();
229 (Message::new(header, self.payload.0), topic)
230 }
231}
232
233impl EdgeMessageBuilder {
234 pub fn ack_kind(mut self, ack_kind: MessageAckExpectKind) -> Self {
235 self.ack_kind = ack_kind;
236 self
237 }
238 pub fn mode_durable(mut self, durability: MessageDurableConfig) -> Self {
239 self.durability = Some(durability);
240 self.target_kind = MessageTargetKind::Durable;
241 self
242 }
243 pub fn mode_online(mut self) -> Self {
244 self.target_kind = MessageTargetKind::Online;
245 self
246 }
247 pub fn mode_push(mut self) -> Self {
248 self.target_kind = MessageTargetKind::Push;
249 self
250 }
251 pub fn with_subject(mut self, subject: Subject) -> Self {
253 self.subjects.push(subject);
254 self
255 }
256 pub fn build(self) -> EdgeMessage {
257 EdgeMessage {
258 header: EdgeMessageHeader {
259 ack_kind: self.ack_kind,
260 target_kind: self.target_kind,
261 durability: self.durability,
262 subjects: self.subjects,
263 topic: self.topic,
264 },
265 payload: MaybeBase64Bytes(self.payload),
266 }
267 }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct EdgeConfig {
272 pub peer_id: NodeId,
273 pub peer_auth: EdgeAuth,
274}
275
276#[derive(Debug, Clone, Default, Serialize, Deserialize)]
277pub struct EdgeAuth {
278 pub payload: MaybeBase64Bytes,
279}