starlane_core/
message.rs

1use std::collections::HashSet;
2use std::convert::{Infallible, TryFrom, TryInto};
3use std::string::FromUtf8Error;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::{broadcast, oneshot};
7use uuid::Uuid;
8
9use starlane_resources::message::{Message, MessageId, MessageReply, ProtoMessage, ResourceRequestMessage, ResourceResponseMessage, ResourcePortMessage};
10use starlane_resources::ResourceIdentifier;
11
12use crate::error::Error;
13use crate::frame::{
14    Frame, MessageAck, MessagePayload, ReplyKind, SimpleReply, StarMessage, StarMessagePayload,
15};
16use crate::resource::{ResourceAddress, ResourceKind, ResourceType, Specific};
17use crate::star::{StarCommand, StarKey};
18use crate::star::shell::search::{StarSearchTransaction, TransactionResult};
19
20pub mod resource;
21
22#[derive(Clone)]
23pub enum ProtoStarMessageTo {
24    None,
25    Star(StarKey),
26    Resource(ResourceIdentifier),
27}
28
29impl ProtoStarMessageTo {
30    pub fn is_none(&self) -> bool {
31        match self {
32            ProtoStarMessageTo::None => true,
33            ProtoStarMessageTo::Star(_) => false,
34            ProtoStarMessageTo::Resource(_) => false,
35        }
36    }
37}
38
39impl From<StarKey> for ProtoStarMessageTo {
40    fn from(key: StarKey) -> Self {
41        ProtoStarMessageTo::Star(key)
42    }
43}
44
45impl From<ResourceIdentifier> for ProtoStarMessageTo {
46    fn from(id: ResourceIdentifier) -> Self {
47        ProtoStarMessageTo::Resource(id)
48    }
49}
50
51impl From<Option<ResourceIdentifier>> for ProtoStarMessageTo {
52    fn from(id: Option<ResourceIdentifier>) -> Self {
53        match id {
54            None => ProtoStarMessageTo::None,
55            Some(id) => ProtoStarMessageTo::Resource(id.into()),
56        }
57    }
58}
59
60pub struct ProtoStarMessage {
61    pub to: ProtoStarMessageTo,
62    pub payload: StarMessagePayload,
63    pub tx: broadcast::Sender<MessageUpdate>,
64    pub rx: broadcast::Receiver<MessageUpdate>,
65    pub reply_to: Option<MessageId>,
66    pub trace: bool,
67    pub log: bool,
68}
69
70impl ProtoStarMessage {
71    pub fn new() -> Self {
72        let (tx, rx) = broadcast::channel(8);
73        ProtoStarMessage::with_txrx(tx, rx)
74    }
75
76    pub fn with_txrx(
77        tx: broadcast::Sender<MessageUpdate>,
78        rx: broadcast::Receiver<MessageUpdate>,
79    ) -> Self {
80        ProtoStarMessage {
81            to: ProtoStarMessageTo::None,
82            payload: StarMessagePayload::None,
83            tx: tx,
84            rx: rx,
85            reply_to: Option::None,
86            trace: false,
87            log: false,
88        }
89    }
90
91    pub fn to(&mut self, to: ProtoStarMessageTo) {
92        self.to = to;
93    }
94
95    pub fn reply_to(&mut self, reply_to: MessageId) {
96        self.reply_to = Option::Some(reply_to);
97    }
98
99    pub fn validate(&self) -> Result<(), Error> {
100        let mut errors = vec![];
101        if self.to.is_none() {
102            errors.push("must specify 'to' field");
103        }
104        if let StarMessagePayload::None = self.payload {
105            errors.push("must specify a message payload");
106        }
107
108        if !errors.is_empty() {
109            let mut rtn = String::new();
110            for err in errors {
111                rtn.push_str(err);
112                rtn.push('\n');
113            }
114            return Err(rtn.into());
115        }
116
117        return Ok(());
118    }
119}
120impl TryFrom<ProtoMessage<ResourceRequestMessage>> for ProtoStarMessage {
121
122    type Error = Error;
123
124    fn try_from(proto: ProtoMessage<ResourceRequestMessage>) -> Result<Self, Self::Error> {
125        proto.validate()?;
126        let message = proto.create()?;
127        let mut proto = ProtoStarMessage::new();
128        proto.to = message.to.clone().into();
129        proto.trace = message.trace;
130        proto.log = message.log;
131        proto.payload = StarMessagePayload::MessagePayload(MessagePayload::Request(message));
132        Ok(proto)
133    }
134}
135
136impl TryFrom<ProtoMessage<ResourcePortMessage>> for ProtoStarMessage {
137
138    type Error = Error;
139
140    fn try_from(proto: ProtoMessage<ResourcePortMessage>) -> Result<Self, Self::Error> {
141        proto.validate()?;
142        let message = proto.create()?;
143        message.try_into()
144    }
145}
146
147impl TryFrom<Message<ResourcePortMessage>> for ProtoStarMessage {
148
149    type Error = Error;
150
151    fn try_from(message: Message<ResourcePortMessage>) -> Result<Self, Self::Error> {
152        let mut proto = ProtoStarMessage::new();
153        proto.to = message.to.clone().into();
154        proto.trace = message.trace;
155        proto.log = message.log;
156        proto.payload = StarMessagePayload::MessagePayload(MessagePayload::PortRequest(message));
157        Ok(proto)
158    }
159}
160
161pub struct MessageReplyTracker {
162    pub reply_to: MessageId,
163    pub tx: broadcast::Sender<MessageUpdate>,
164}
165
166impl MessageReplyTracker {
167    pub fn on_message(&self, message: &StarMessage) -> TrackerJob {
168        match &message.payload {
169            StarMessagePayload::Reply(reply) => match reply {
170                SimpleReply::Ok(_reply) => {
171                    self.tx.send(MessageUpdate::Result(MessageResult::Ok(
172                        message.payload.clone(),
173                    )));
174                    TrackerJob::Done
175                }
176                SimpleReply::Fail(fail) => {
177                    self.tx
178                        .send(MessageUpdate::Result(MessageResult::Err(fail.to_string())));
179                    TrackerJob::Done
180                }
181                SimpleReply::Ack(ack) => {
182                    self.tx.send(MessageUpdate::Ack(ack.clone()));
183                    TrackerJob::Continue
184                }
185            },
186            _ => TrackerJob::Continue,
187        }
188    }
189}
190
191pub enum TrackerJob {
192    Continue,
193    Done,
194}
195
196#[derive(Clone)]
197pub enum MessageUpdate {
198    Ack(MessageAck),
199    Result(MessageResult<StarMessagePayload>),
200}
201
202#[derive(Clone)]
203pub enum MessageResult<OK> {
204    Ok(OK),
205    Err(String),
206    Timeout,
207}
208
209impl<OK> ToString for MessageResult<OK> {
210    fn to_string(&self) -> String {
211        match self {
212            MessageResult::Ok(_) => "Ok".to_string(),
213            MessageResult::Err(err) => format!("Err({})", err),
214            MessageResult::Timeout => "Timeout".to_string(),
215        }
216    }
217}
218
219#[derive(Clone)]
220pub enum MessageExpect {
221    None,
222    Reply(ReplyKind),
223}
224
225#[derive(Clone)]
226pub enum MessageExpectWait {
227    Short,
228    Med,
229    Long,
230}
231
232impl MessageExpectWait {
233    pub fn wait_seconds(&self) -> u64 {
234        match self {
235            MessageExpectWait::Short => 5,
236            MessageExpectWait::Med => 10,
237            MessageExpectWait::Long => 30,
238        }
239    }
240
241    pub fn retries(&self) -> usize {
242        match self {
243            MessageExpectWait::Short => 5,
244            MessageExpectWait::Med => 10,
245            MessageExpectWait::Long => 15,
246        }
247    }
248}
249
250pub struct OkResultWaiter {
251    rx: broadcast::Receiver<MessageUpdate>,
252    tx: oneshot::Sender<StarMessagePayload>,
253}
254
255impl OkResultWaiter {
256    pub fn new(
257        rx: broadcast::Receiver<MessageUpdate>,
258    ) -> (Self, oneshot::Receiver<StarMessagePayload>) {
259        let (tx, osrx) = oneshot::channel();
260        (OkResultWaiter { rx: rx, tx: tx }, osrx)
261    }
262
263    pub async fn wait(mut self) {
264        tokio::spawn(async move {
265            loop {
266                if let Ok(MessageUpdate::Result(result)) = self.rx.recv().await {
267                    match result {
268                        MessageResult::Ok(payload) => {
269                            self.tx.send(payload);
270                        }
271                        x => {
272                            eprintln!(
273                                "not expecting this results for OkResultWaiter...{} ",
274                                x.to_string()
275                            );
276                            self.tx.send(StarMessagePayload::None);
277                        }
278                    }
279                    break;
280                }
281            }
282        });
283    }
284}
285
286pub struct ResultWaiter {
287    rx: broadcast::Receiver<MessageUpdate>,
288    tx: oneshot::Sender<MessageResult<StarMessagePayload>>,
289}
290
291impl ResultWaiter {
292    pub fn new(
293        rx: broadcast::Receiver<MessageUpdate>,
294    ) -> (Self, oneshot::Receiver<MessageResult<StarMessagePayload>>) {
295        let (tx, osrx) = oneshot::channel();
296        (ResultWaiter { rx: rx, tx: tx }, osrx)
297    }
298
299    pub async fn wait(mut self) {
300        tokio::spawn(async move {
301            loop {
302                if let Ok(MessageUpdate::Result(result)) = self.rx.recv().await {
303                    self.tx.send(result);
304                    break;
305                }
306            }
307        });
308    }
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize)]
312pub struct Reject {
313    pub reason: String,
314    pub kind: RejectKind,
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub enum RejectKind {
319    Error,
320    Denied,
321    BadRequest,
322}
323
324fn hash_to_string(hash: &HashSet<ResourceType>) -> String {
325    let mut rtn = String::new();
326    for i in hash.iter() {
327        rtn.push_str(i.to_string().as_str());
328        rtn.push_str(", ");
329    }
330    rtn
331}
332
333impl From<Message<ResourceRequestMessage>> for ProtoStarMessage {
334    fn from(message: Message<ResourceRequestMessage>) -> Self {
335        let mut proto = ProtoStarMessage::new();
336        proto.to = message.to.clone().into();
337        proto.payload = StarMessagePayload::MessagePayload(MessagePayload::Request(message));
338        proto
339    }
340}
341
342impl From<MessageReply<ResourceResponseMessage>> for ProtoStarMessage {
343
344    fn from(reply: MessageReply<ResourceResponseMessage>) -> Self {
345        let mut proto = ProtoStarMessage::new();
346        proto.payload = StarMessagePayload::MessagePayload(MessagePayload::Response(reply));
347        proto
348    }
349}
350
351impl From<Message<ResourceRequestMessage>> for StarMessagePayload {
352    fn from( message: Message<ResourceRequestMessage> ) -> Self {
353        StarMessagePayload::MessagePayload(MessagePayload::Request(message))
354    }
355}
356
357impl From<MessageReply<ResourceResponseMessage>> for StarMessagePayload {
358    fn from( message: MessageReply<ResourceResponseMessage> ) -> Self {
359        StarMessagePayload::MessagePayload(MessagePayload::Response(message))
360    }
361}