asteroid_mq_model/
edge.rs

1use std::borrow::Cow;
2pub mod connection;
3use bytes::Bytes;
4use serde::{Deserialize, Serialize};
5use typeshare::typeshare;
6
7use crate::{
8    durable::MessageDurableConfig,
9    endpoint::EndpointAddr,
10    interest::{Interest, Subject},
11    message::{Message, MessageAckExpectKind, MessageHeader, MessageId, MessageTargetKind},
12    proposal::{EndpointInterest, SetState},
13    topic::{TopicCode, WaitAckError, WaitAckSuccess},
14    util::MaybeBase64Bytes,
15    NodeId,
16};
17
18#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
19#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
20#[repr(u8)]
21#[typeshare]
22#[serde(tag = "kind", content = "content")]
23pub enum EdgeRequestEnum {
24    SendMessage(EdgeMessage),
25    EndpointOnline(EdgeEndpointOnline),
26    EndpointOffline(EdgeEndpointOffline),
27    EndpointInterest(EndpointInterest),
28    SetState(SetState),
29}
30#[derive(Debug, Clone, Serialize, Deserialize)]
31#[typeshare]
32#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
33
34pub struct EdgeEndpointOnline {
35    pub topic_code: TopicCode,
36    pub interests: Vec<Interest>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
40#[typeshare]
41#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
42pub struct EdgeEndpointOffline {
43    pub topic_code: TopicCode,
44    pub endpoint: EndpointAddr,
45}
46
47#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
48#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
49#[typeshare]
50#[serde(tag = "kind", content = "content")]
51pub enum EdgePayload {
52    Push(EdgePush),
53    Response(EdgeResponse),
54    Request(EdgeRequest),
55    Error(EdgeError),
56}
57
58#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
59#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
60#[typeshare]
61#[serde(tag = "kind", content = "content")]
62pub enum EdgePush {
63    Message {
64        endpoints: Vec<EndpointAddr>,
65        message: Message,
66    },
67}
68
69#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
70#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
71#[typeshare]
72pub struct EdgeRequest {
73    pub seq_id: u32,
74    pub request: EdgeRequestEnum,
75}
76
77#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
78#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
79#[typeshare]
80pub struct EdgeResponse {
81    pub seq_id: u32,
82    pub result: EdgeResult<EdgeResponseEnum, EdgeError>,
83}
84
85#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
86#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
87#[typeshare]
88#[serde(tag = "kind", content = "content")]
89pub enum EdgeResponseEnum {
90    SendMessage(EdgeResult<WaitAckSuccess, WaitAckError>),
91    EndpointOnline(EndpointAddr),
92    EndpointOffline,
93    EndpointInterest,
94    SetState,
95}
96#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
97#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
98#[typeshare]
99#[serde(tag = "kind", content = "content")]
100pub enum EdgeResult<T, E> {
101    Ok(T),
102    Err(E),
103}
104
105impl<T, E> EdgeResult<T, E> {
106    pub fn from_std(result: Result<T, E>) -> Self {
107        match result {
108            Ok(t) => Self::Ok(t),
109            Err(e) => Self::Err(e),
110        }
111    }
112    pub fn into_std(self) -> Result<T, E> {
113        match self {
114            Self::Ok(t) => Ok(t),
115            Self::Err(e) => Err(e),
116        }
117    }
118}
119
120impl EdgeResponse {
121    pub fn from_result(id: u32, result: Result<EdgeResponseEnum, EdgeError>) -> Self {
122        Self {
123            seq_id: id,
124            result: EdgeResult::from_std(result),
125        }
126    }
127}
128#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
129#[typeshare]
130pub struct EdgeError {
131    pub context: Cow<'static, str>,
132    pub message: Option<Cow<'static, str>>,
133    pub kind: EdgeErrorKind,
134}
135
136impl EdgeError {
137    pub fn new(context: impl Into<Cow<'static, str>>, kind: EdgeErrorKind) -> Self {
138        Self {
139            context: context.into(),
140            message: None,
141            kind,
142        }
143    }
144    pub fn with_message(
145        context: impl Into<Cow<'static, str>>,
146        message: impl Into<Cow<'static, str>>,
147        kind: EdgeErrorKind,
148    ) -> Self {
149        Self {
150            context: context.into(),
151            message: Some(message.into()),
152            kind,
153        }
154    }
155}
156
157#[repr(u8)]
158#[derive(Debug, Clone, Copy, serde::Serialize, serde::Deserialize)]
159#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
160#[typeshare]
161pub enum EdgeErrorKind {
162    Decode = 0x00,
163    TopicNotFound = 0x02,
164    EndpointNotFound = 0x03,
165    Unauthorized = 0x04,
166    Internal = 0xf0,
167}
168
169#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
170#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
171#[typeshare]
172pub struct EdgeMessageHeader {
173    pub ack_kind: MessageAckExpectKind,
174    pub target_kind: MessageTargetKind,
175    pub durability: Option<MessageDurableConfig>,
176    pub subjects: Vec<Subject>,
177    pub topic: TopicCode,
178}
179
180impl EdgeMessageHeader {
181    pub fn into_message_header(self) -> (MessageHeader, TopicCode) {
182        (
183            MessageHeader {
184                message_id: MessageId::new_snowflake(),
185                ack_kind: self.ack_kind,
186                target_kind: self.target_kind,
187                durability: self.durability,
188                subjects: self.subjects.into(),
189            },
190            self.topic,
191        )
192    }
193}
194#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
195#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
196#[typeshare]
197pub struct EdgeMessage {
198    pub header: EdgeMessageHeader,
199    pub payload: MaybeBase64Bytes,
200}
201
202pub struct EdgeMessageBuilder {
203    ack_kind: MessageAckExpectKind,
204    target_kind: MessageTargetKind,
205    durability: Option<MessageDurableConfig>,
206    subjects: Vec<Subject>,
207    topic: TopicCode,
208    payload: Bytes,
209}
210
211impl EdgeMessage {
212    pub fn builder<T, S, P>(topic_code: T, subjects: S, payload: P) -> EdgeMessageBuilder
213    where
214        T: Into<TopicCode>,
215        S: IntoIterator<Item = Subject>,
216        P: Into<Bytes>,
217    {
218        EdgeMessageBuilder {
219            ack_kind: MessageAckExpectKind::Sent,
220            target_kind: MessageTargetKind::Push,
221            durability: None,
222            subjects: subjects.into_iter().collect(),
223            topic: topic_code.into(),
224            payload: payload.into(),
225        }
226    }
227    pub fn into_message(self) -> (Message, TopicCode) {
228        let (header, topic) = self.header.into_message_header();
229        (Message::new(header, self.payload.0), topic)
230    }
231}
232
233impl EdgeMessageBuilder {
234    pub fn ack_kind(mut self, ack_kind: MessageAckExpectKind) -> Self {
235        self.ack_kind = ack_kind;
236        self
237    }
238    pub fn mode_durable(mut self, durability: MessageDurableConfig) -> Self {
239        self.durability = Some(durability);
240        self.target_kind = MessageTargetKind::Durable;
241        self
242    }
243    pub fn mode_online(mut self) -> Self {
244        self.target_kind = MessageTargetKind::Online;
245        self
246    }
247    pub fn mode_push(mut self) -> Self {
248        self.target_kind = MessageTargetKind::Push;
249        self
250    }
251    /// wrap a subject with this message
252    pub fn with_subject(mut self, subject: Subject) -> Self {
253        self.subjects.push(subject);
254        self
255    }
256    pub fn build(self) -> EdgeMessage {
257        EdgeMessage {
258            header: EdgeMessageHeader {
259                ack_kind: self.ack_kind,
260                target_kind: self.target_kind,
261                durability: self.durability,
262                subjects: self.subjects,
263                topic: self.topic,
264            },
265            payload: MaybeBase64Bytes(self.payload),
266        }
267    }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
271pub struct EdgeConfig {
272    pub peer_id: NodeId,
273    pub peer_auth: EdgeAuth,
274}
275
276#[derive(Debug, Clone, Default, Serialize, Deserialize)]
277pub struct EdgeAuth {
278    pub payload: MaybeBase64Bytes,
279}