1use std::collections::HashSet;
2use std::convert::{Infallible, TryFrom, TryInto};
3use std::string::FromUtf8Error;
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::{broadcast, oneshot};
7use uuid::Uuid;
8
9use starlane_resources::message::{Message, MessageId, MessageReply, ProtoMessage, ResourceRequestMessage, ResourceResponseMessage, ResourcePortMessage};
10use starlane_resources::ResourceIdentifier;
11
12use crate::error::Error;
13use crate::frame::{
14 Frame, MessageAck, MessagePayload, ReplyKind, SimpleReply, StarMessage, StarMessagePayload,
15};
16use crate::resource::{ResourceAddress, ResourceKind, ResourceType, Specific};
17use crate::star::{StarCommand, StarKey};
18use crate::star::shell::search::{StarSearchTransaction, TransactionResult};
19
20pub mod resource;
21
22#[derive(Clone)]
23pub enum ProtoStarMessageTo {
24 None,
25 Star(StarKey),
26 Resource(ResourceIdentifier),
27}
28
29impl ProtoStarMessageTo {
30 pub fn is_none(&self) -> bool {
31 match self {
32 ProtoStarMessageTo::None => true,
33 ProtoStarMessageTo::Star(_) => false,
34 ProtoStarMessageTo::Resource(_) => false,
35 }
36 }
37}
38
39impl From<StarKey> for ProtoStarMessageTo {
40 fn from(key: StarKey) -> Self {
41 ProtoStarMessageTo::Star(key)
42 }
43}
44
45impl From<ResourceIdentifier> for ProtoStarMessageTo {
46 fn from(id: ResourceIdentifier) -> Self {
47 ProtoStarMessageTo::Resource(id)
48 }
49}
50
51impl From<Option<ResourceIdentifier>> for ProtoStarMessageTo {
52 fn from(id: Option<ResourceIdentifier>) -> Self {
53 match id {
54 None => ProtoStarMessageTo::None,
55 Some(id) => ProtoStarMessageTo::Resource(id.into()),
56 }
57 }
58}
59
60pub struct ProtoStarMessage {
61 pub to: ProtoStarMessageTo,
62 pub payload: StarMessagePayload,
63 pub tx: broadcast::Sender<MessageUpdate>,
64 pub rx: broadcast::Receiver<MessageUpdate>,
65 pub reply_to: Option<MessageId>,
66 pub trace: bool,
67 pub log: bool,
68}
69
70impl ProtoStarMessage {
71 pub fn new() -> Self {
72 let (tx, rx) = broadcast::channel(8);
73 ProtoStarMessage::with_txrx(tx, rx)
74 }
75
76 pub fn with_txrx(
77 tx: broadcast::Sender<MessageUpdate>,
78 rx: broadcast::Receiver<MessageUpdate>,
79 ) -> Self {
80 ProtoStarMessage {
81 to: ProtoStarMessageTo::None,
82 payload: StarMessagePayload::None,
83 tx: tx,
84 rx: rx,
85 reply_to: Option::None,
86 trace: false,
87 log: false,
88 }
89 }
90
91 pub fn to(&mut self, to: ProtoStarMessageTo) {
92 self.to = to;
93 }
94
95 pub fn reply_to(&mut self, reply_to: MessageId) {
96 self.reply_to = Option::Some(reply_to);
97 }
98
99 pub fn validate(&self) -> Result<(), Error> {
100 let mut errors = vec![];
101 if self.to.is_none() {
102 errors.push("must specify 'to' field");
103 }
104 if let StarMessagePayload::None = self.payload {
105 errors.push("must specify a message payload");
106 }
107
108 if !errors.is_empty() {
109 let mut rtn = String::new();
110 for err in errors {
111 rtn.push_str(err);
112 rtn.push('\n');
113 }
114 return Err(rtn.into());
115 }
116
117 return Ok(());
118 }
119}
120impl TryFrom<ProtoMessage<ResourceRequestMessage>> for ProtoStarMessage {
121
122 type Error = Error;
123
124 fn try_from(proto: ProtoMessage<ResourceRequestMessage>) -> Result<Self, Self::Error> {
125 proto.validate()?;
126 let message = proto.create()?;
127 let mut proto = ProtoStarMessage::new();
128 proto.to = message.to.clone().into();
129 proto.trace = message.trace;
130 proto.log = message.log;
131 proto.payload = StarMessagePayload::MessagePayload(MessagePayload::Request(message));
132 Ok(proto)
133 }
134}
135
136impl TryFrom<ProtoMessage<ResourcePortMessage>> for ProtoStarMessage {
137
138 type Error = Error;
139
140 fn try_from(proto: ProtoMessage<ResourcePortMessage>) -> Result<Self, Self::Error> {
141 proto.validate()?;
142 let message = proto.create()?;
143 message.try_into()
144 }
145}
146
147impl TryFrom<Message<ResourcePortMessage>> for ProtoStarMessage {
148
149 type Error = Error;
150
151 fn try_from(message: Message<ResourcePortMessage>) -> Result<Self, Self::Error> {
152 let mut proto = ProtoStarMessage::new();
153 proto.to = message.to.clone().into();
154 proto.trace = message.trace;
155 proto.log = message.log;
156 proto.payload = StarMessagePayload::MessagePayload(MessagePayload::PortRequest(message));
157 Ok(proto)
158 }
159}
160
161pub struct MessageReplyTracker {
162 pub reply_to: MessageId,
163 pub tx: broadcast::Sender<MessageUpdate>,
164}
165
166impl MessageReplyTracker {
167 pub fn on_message(&self, message: &StarMessage) -> TrackerJob {
168 match &message.payload {
169 StarMessagePayload::Reply(reply) => match reply {
170 SimpleReply::Ok(_reply) => {
171 self.tx.send(MessageUpdate::Result(MessageResult::Ok(
172 message.payload.clone(),
173 )));
174 TrackerJob::Done
175 }
176 SimpleReply::Fail(fail) => {
177 self.tx
178 .send(MessageUpdate::Result(MessageResult::Err(fail.to_string())));
179 TrackerJob::Done
180 }
181 SimpleReply::Ack(ack) => {
182 self.tx.send(MessageUpdate::Ack(ack.clone()));
183 TrackerJob::Continue
184 }
185 },
186 _ => TrackerJob::Continue,
187 }
188 }
189}
190
191pub enum TrackerJob {
192 Continue,
193 Done,
194}
195
196#[derive(Clone)]
197pub enum MessageUpdate {
198 Ack(MessageAck),
199 Result(MessageResult<StarMessagePayload>),
200}
201
202#[derive(Clone)]
203pub enum MessageResult<OK> {
204 Ok(OK),
205 Err(String),
206 Timeout,
207}
208
209impl<OK> ToString for MessageResult<OK> {
210 fn to_string(&self) -> String {
211 match self {
212 MessageResult::Ok(_) => "Ok".to_string(),
213 MessageResult::Err(err) => format!("Err({})", err),
214 MessageResult::Timeout => "Timeout".to_string(),
215 }
216 }
217}
218
219#[derive(Clone)]
220pub enum MessageExpect {
221 None,
222 Reply(ReplyKind),
223}
224
225#[derive(Clone)]
226pub enum MessageExpectWait {
227 Short,
228 Med,
229 Long,
230}
231
232impl MessageExpectWait {
233 pub fn wait_seconds(&self) -> u64 {
234 match self {
235 MessageExpectWait::Short => 5,
236 MessageExpectWait::Med => 10,
237 MessageExpectWait::Long => 30,
238 }
239 }
240
241 pub fn retries(&self) -> usize {
242 match self {
243 MessageExpectWait::Short => 5,
244 MessageExpectWait::Med => 10,
245 MessageExpectWait::Long => 15,
246 }
247 }
248}
249
250pub struct OkResultWaiter {
251 rx: broadcast::Receiver<MessageUpdate>,
252 tx: oneshot::Sender<StarMessagePayload>,
253}
254
255impl OkResultWaiter {
256 pub fn new(
257 rx: broadcast::Receiver<MessageUpdate>,
258 ) -> (Self, oneshot::Receiver<StarMessagePayload>) {
259 let (tx, osrx) = oneshot::channel();
260 (OkResultWaiter { rx: rx, tx: tx }, osrx)
261 }
262
263 pub async fn wait(mut self) {
264 tokio::spawn(async move {
265 loop {
266 if let Ok(MessageUpdate::Result(result)) = self.rx.recv().await {
267 match result {
268 MessageResult::Ok(payload) => {
269 self.tx.send(payload);
270 }
271 x => {
272 eprintln!(
273 "not expecting this results for OkResultWaiter...{} ",
274 x.to_string()
275 );
276 self.tx.send(StarMessagePayload::None);
277 }
278 }
279 break;
280 }
281 }
282 });
283 }
284}
285
286pub struct ResultWaiter {
287 rx: broadcast::Receiver<MessageUpdate>,
288 tx: oneshot::Sender<MessageResult<StarMessagePayload>>,
289}
290
291impl ResultWaiter {
292 pub fn new(
293 rx: broadcast::Receiver<MessageUpdate>,
294 ) -> (Self, oneshot::Receiver<MessageResult<StarMessagePayload>>) {
295 let (tx, osrx) = oneshot::channel();
296 (ResultWaiter { rx: rx, tx: tx }, osrx)
297 }
298
299 pub async fn wait(mut self) {
300 tokio::spawn(async move {
301 loop {
302 if let Ok(MessageUpdate::Result(result)) = self.rx.recv().await {
303 self.tx.send(result);
304 break;
305 }
306 }
307 });
308 }
309}
310
311#[derive(Debug, Clone, Serialize, Deserialize)]
312pub struct Reject {
313 pub reason: String,
314 pub kind: RejectKind,
315}
316
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub enum RejectKind {
319 Error,
320 Denied,
321 BadRequest,
322}
323
324fn hash_to_string(hash: &HashSet<ResourceType>) -> String {
325 let mut rtn = String::new();
326 for i in hash.iter() {
327 rtn.push_str(i.to_string().as_str());
328 rtn.push_str(", ");
329 }
330 rtn
331}
332
333impl From<Message<ResourceRequestMessage>> for ProtoStarMessage {
334 fn from(message: Message<ResourceRequestMessage>) -> Self {
335 let mut proto = ProtoStarMessage::new();
336 proto.to = message.to.clone().into();
337 proto.payload = StarMessagePayload::MessagePayload(MessagePayload::Request(message));
338 proto
339 }
340}
341
342impl From<MessageReply<ResourceResponseMessage>> for ProtoStarMessage {
343
344 fn from(reply: MessageReply<ResourceResponseMessage>) -> Self {
345 let mut proto = ProtoStarMessage::new();
346 proto.payload = StarMessagePayload::MessagePayload(MessagePayload::Response(reply));
347 proto
348 }
349}
350
351impl From<Message<ResourceRequestMessage>> for StarMessagePayload {
352 fn from( message: Message<ResourceRequestMessage> ) -> Self {
353 StarMessagePayload::MessagePayload(MessagePayload::Request(message))
354 }
355}
356
357impl From<MessageReply<ResourceResponseMessage>> for StarMessagePayload {
358 fn from( message: MessageReply<ResourceResponseMessage> ) -> Self {
359 StarMessagePayload::MessagePayload(MessagePayload::Response(message))
360 }
361}