starlane_core/
frame.rs

1use std::fmt;
2use std::fmt::{Debug, Formatter};
3
4use semver::SemVerError;
5use serde::{Deserialize, Serialize};
6use tokio::sync::{broadcast, mpsc, oneshot};
7use tokio::time::error::Elapsed;
8
9use starlane_resources::{AssignResourceStateSrc, ResourceAssign, ResourceCreate, ResourceIdentifier, ResourceSelector, ResourceStatus, ResourceStub, ResourceAddress, Labels};
10use starlane_resources::data::{BinSrc, DataSet};
11use starlane_resources::message::{Fail, Message, MessageId, MessageReply, RawState, ResourceRequestMessage, ResourceResponseMessage, ResourcePortMessage, ResourcePortReply};
12
13use crate::error::Error;
14use crate::id::Id;
15use crate::logger::Flags;
16use crate::message::{MessageExpect, MessageUpdate, ProtoStarMessage};
17use crate::message::resource::ActorMessage;
18use crate::star::{Star, StarCommand, StarInfo, StarKey, StarKind, StarNotify, StarSubGraphKey};
19use crate::watch::{Notification, Watch, WatchKey};
20use crate::resource::{ResourceId, ResourceRegistration, ResourceRecord, ResourceType, ResourceKey, ResourceSliceStatus,  UserKey, AppKey, ActorKey};
21use starlane_resources::property::ResourceValues;
22
23#[derive(Debug, Clone, Serialize, Deserialize,strum_macros::Display)]
24pub enum Frame {
25    Proto(ProtoFrame),
26    Diagnose(Diagnose),
27    SearchTraversal(SearchTraversal),
28    StarMessage(StarMessage),
29    Watch(WatchFrame),
30    Close,
31}
32
33
34#[derive(Debug, Clone, Serialize, Deserialize,strum_macros::Display)]
35pub enum WatchFrame {
36    Watch(Watch),
37    UnWatch(WatchKey),
38    Notify(Notification)
39}
40
41
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub enum SearchTraversal {
45    Up(SearchWindUp),
46    Down(SearchWindDown),
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub enum ProtoFrame {
51    StarLaneProtocolVersion(i32),
52    ReportStarKey(StarKey),
53    GatewaySelect,
54    GatewayAssign(Vec<StarSubGraphKey>),
55}
56
57
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct WatchInfo {
61    pub id: Id,
62    pub actor: ActorKey,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct StarMessageAck {
67    pub from: StarKey,
68    pub to: StarKey,
69    pub id: Id,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub enum Diagnose {
74    Ping,
75    Pong,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct SearchWindUp {
80    pub from: StarKey,
81    pub pattern: StarPattern,
82    pub hops: Vec<StarKey>,
83    pub transactions: Vec<u64>,
84    pub max_hops: usize,
85    pub action: TraversalAction,
86}
87
88impl SearchWindUp {
89    pub fn new(from: StarKey, pattern: StarPattern, action: TraversalAction) -> Self {
90        SearchWindUp {
91            from: from,
92            pattern: pattern,
93            action: action,
94            hops: vec![],
95            transactions: vec![],
96            max_hops: 255,
97        }
98    }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum TraversalAction {
103    SearchHits,
104    Flags(Flags),
105}
106
107impl TraversalAction {
108    pub fn update(
109        &self,
110        mut new_hits: Vec<SearchHit>,
111        result: SearchResults,
112    ) -> Result<SearchResults, Error> {
113        match self {
114            TraversalAction::SearchHits => {
115                if let SearchResults::None = result {
116                    let mut hits = vec![];
117                    hits.append(&mut new_hits);
118                    Ok(SearchResults::Hits(hits))
119                } else if let SearchResults::Hits(mut old_hits) = result {
120                    let mut hits = vec![];
121                    hits.append(&mut old_hits);
122                    hits.append(&mut new_hits);
123                    Ok(SearchResults::Hits(hits))
124                } else {
125                    Err(
126                        "when action is SearchHIts, expecting WindResult::Hits or WindResult::None"
127                            .into(),
128                    )
129                }
130            }
131            TraversalAction::Flags(_flags) => Ok(SearchResults::None),
132        }
133    }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub enum SearchResults {
138    None,
139    Hits(Vec<SearchHit>),
140}
141
142impl SearchWindUp {
143    pub fn inc(&mut self, hop: StarKey, transaction: u64) {
144        self.hops.push(hop);
145        self.transactions.push(transaction);
146    }
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize,strum_macros::Display)]
150pub enum StarPattern {
151    Any,
152    None,
153    StarKey(StarKey),
154    StarKind(StarKind),
155    StarKeySubgraph(Vec<StarSubGraphKey>)
156}
157
158impl StarPattern {
159    pub fn info_match(&self, info: &StarInfo) -> bool {
160        match self {
161            StarPattern::Any => true,
162            StarPattern::None => false,
163            StarPattern::StarKey(_) => {
164                self.key_match(&info.key)
165            }
166            StarPattern::StarKind(pattern) => *pattern == info.kind,
167            StarPattern::StarKeySubgraph(_) => {
168                self.key_match(&info.key)
169            }
170        }
171    }
172
173    pub fn key_match(&self, star: &StarKey) -> bool {
174        match self {
175            StarPattern::Any => true,
176            StarPattern::None => false,
177            StarPattern::StarKey(pattern) => *star == *pattern,
178            StarPattern::StarKind(_) => false,
179            StarPattern::StarKeySubgraph(pattern) => {
180                // TODO match tail end of subgraph
181                *pattern == star.subgraph
182            }
183        }
184    }
185
186
187    pub fn is_single_match(&self) -> bool {
188        match self {
189            StarPattern::StarKey(_) => true,
190            StarPattern::StarKind(_) => false,
191            StarPattern::Any => false,
192            StarPattern::None => false,
193            StarPattern::StarKeySubgraph(_) => false
194        }
195    }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct SearchWindDown {
200    pub missed: Option<StarKey>,
201    pub result: SearchResults,
202    pub wind_up: SearchWindUp,
203    pub transactions: Vec<u64>,
204    pub hops: Vec<StarKey>,
205}
206
207impl SearchWindDown {
208    pub fn pop(&mut self) {
209        self.transactions.pop();
210        self.hops.pop();
211    }
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)]
215pub struct SearchHit {
216    pub star: StarKey,
217    pub hops: usize,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct StarMessage {
222    pub from: StarKey,
223    pub to: StarKey,
224    pub id: MessageId,
225    pub payload: StarMessagePayload,
226    pub reply_to: Option<MessageId>,
227    pub trace: bool,
228    pub log: bool,
229}
230
231impl StarMessage {
232    pub fn new(id: MessageId, from: StarKey, to: StarKey, payload: StarMessagePayload) -> Self {
233        StarMessage {
234            id: id,
235            from: from,
236            to: to,
237            payload: payload,
238            reply_to: Option::None,
239            trace: false,
240            log: true,
241        }
242    }
243
244    pub fn to_central(id: MessageId, from: StarKey, payload: StarMessagePayload) -> Self {
245        StarMessage {
246            id: id,
247            from: from,
248            to: StarKey::central(),
249            payload: payload,
250            reply_to: Option::None,
251            trace: false,
252            log: false,
253        }
254    }
255
256    pub fn forward(&self, _to: &StarKey) -> ProtoStarMessage {
257        let mut proto = ProtoStarMessage::new();
258        proto.to = self.to.clone().into();
259        proto.payload = self.payload.clone();
260        proto
261    }
262
263    /*
264    pub async fn reply_tx(
265        self,
266        star_tx: mpsc::Sender<StarCommand>,
267    ) -> oneshot::Sender<StarMessagePayload> {
268        let message = self;
269        let (tx, rx) = oneshot::channel();
270        tokio::spawn(async move {
271            match rx.await {
272                Ok(payload) => {
273                    let proto = message.reply(payload);
274                    star_tx.send(StarCommand::SendProtoMessage(proto));
275                }
276                Err(_error) => {
277                    let proto = message.reply_err("no reply".to_string());
278                    star_tx.send(StarCommand::SendProtoMessage(proto));
279                }
280            }
281        });
282
283        tx
284    }
285
286     */
287
288    pub fn fail(&self, fail: Fail) -> ProtoStarMessage {
289        self.reply(StarMessagePayload::Reply(SimpleReply::Fail(fail)))
290    }
291
292    pub fn ok(&self, reply: Reply) -> ProtoStarMessage {
293        self.reply(StarMessagePayload::Reply(SimpleReply::Ok(reply)))
294    }
295
296    pub fn reply(&self, payload: StarMessagePayload) -> ProtoStarMessage {
297        let mut proto = ProtoStarMessage::new();
298        proto.to = self.from.clone().into();
299        proto.reply_to = Option::Some(self.id.clone());
300        proto.payload = payload;
301        proto
302    }
303
304    pub fn reply_err(&self, err: String) -> ProtoStarMessage {
305        let mut proto = ProtoStarMessage::new();
306        proto.to = self.from.clone().into();
307        proto.reply_to = Option::Some(self.id.clone());
308        proto.payload = StarMessagePayload::Reply(SimpleReply::Fail(Fail::Error(err)));
309        proto
310    }
311
312    pub fn reply_ok(&self, reply: Reply) -> ProtoStarMessage {
313        let mut proto = ProtoStarMessage::new();
314        proto.to = self.from.clone().into();
315        proto.reply_to = Option::Some(self.id.clone());
316        proto.payload = StarMessagePayload::Reply(SimpleReply::Ok(reply));
317        proto
318    }
319
320    pub fn resubmit(
321        self,
322        tx: broadcast::Sender<MessageUpdate>,
323        rx: broadcast::Receiver<MessageUpdate>,
324    ) -> ProtoStarMessage {
325        let mut proto = ProtoStarMessage::with_txrx(tx, rx);
326        proto.to = self.from.clone().into();
327        proto.reply_to = Option::Some(self.id.clone());
328        proto.payload = self.payload;
329        proto
330    }
331}
332
333#[derive(Clone, Serialize, Deserialize)]
334pub enum StarMessagePayload {
335    None,
336    MessagePayload(MessagePayload),
337    ResourceManager(RegistryAction),
338    ResourceHost(ResourceHostAction),
339    Space(SpaceMessage),
340    Reply(SimpleReply),
341    UniqueId(ResourceId),
342}
343
344impl Debug for StarMessagePayload {
345    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
346        f.write_str(match self {
347            StarMessagePayload::None => "None",
348            StarMessagePayload::MessagePayload(_) => "MessagePayload",
349            StarMessagePayload::ResourceManager(_) => "ResourceManager",
350            StarMessagePayload::ResourceHost(_) => "ResourceHost",
351            StarMessagePayload::Space(_) => "Space",
352            StarMessagePayload::Reply(_) => "Reply",
353            StarMessagePayload::UniqueId(_) => "UniqueId",
354        });
355        Ok(())
356    }
357}
358
359#[derive(Clone, Serialize, Deserialize)]
360pub enum MessagePayload {
361    Request(Message<ResourceRequestMessage>),
362    Response(MessageReply<ResourceResponseMessage>),
363    PortRequest(Message<ResourcePortMessage>),
364}
365
366#[derive(Clone, Serialize, Deserialize)]
367pub enum ResourceHostAction {
368    //IsHosting(ResourceKey),
369    Assign(ResourceAssign<AssignResourceStateSrc<DataSet<BinSrc>>>),
370    Init(ResourceKey),
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub enum RegistryAction {
375    Register(ResourceRegistration),
376    Location(ResourceRecord),
377    Find(ResourceIdentifier),
378    Status(ResourceStatusReport),
379    UniqueResourceId {
380        parent: ResourceIdentifier,
381        child_type: ResourceType,
382    },
383}
384
385impl ToString for RegistryAction {
386    fn to_string(&self) -> String {
387        match self {
388            RegistryAction::Register(_) => "Register".to_string(),
389            RegistryAction::Location(_) => "Location".to_string(),
390            RegistryAction::Find(_) => "Find".to_string(),
391            RegistryAction::Status(_) => "Status".to_string(),
392            RegistryAction::UniqueResourceId { .. } => "UniqueResourceId".to_string(),
393        }
394    }
395}
396
397#[derive(Debug, Clone, Serialize, Deserialize)]
398pub struct ResourceStatusReport {
399    pub key: ResourceKey,
400    pub status: ResourceStatus,
401}
402
403#[derive(Clone, Serialize, Deserialize)]
404pub struct ResourceSliceStatusReport {
405    pub key: ResourceKey,
406    pub status: ResourceSliceStatus,
407}
408
409#[derive(Clone, Serialize, Deserialize)]
410pub enum SimpleReply {
411    Ok(Reply),
412    Fail(Fail),
413    Ack(MessageAck),
414}
415
416impl ToString for SimpleReply {
417    fn to_string(&self) -> String {
418        match self {
419            SimpleReply::Ok(ok) => format!("Ok({})", ok.to_string()),
420            SimpleReply::Fail(fail) => format!("Fail({})", fail.to_string()),
421            SimpleReply::Ack(_ack) => "Ack".to_string(),
422        }
423    }
424}
425
426impl StarMessagePayload {
427    pub fn is_ack(&self) -> bool {
428        match self {
429            StarMessagePayload::Reply(reply) => match reply {
430                SimpleReply::Ack(_) => true,
431                _ => false,
432            },
433            _ => false,
434        }
435    }
436}
437
438#[derive(Clone, Serialize, Deserialize, strum_macros::Display)]
439pub enum Reply {
440    Empty,
441    Key(ResourceKey),
442    Address(ResourceAddress),
443    Records(Vec<ResourceRecord>),
444    Record(ResourceRecord),
445    Message(MessageReply<ResourceResponseMessage>),
446    Id(ResourceId),
447    State(DataSet<BinSrc>),
448    ResourceValues(ResourceValues<ResourceStub>),
449    Seq(u64),
450    Port(DataSet<BinSrc>)
451}
452
453#[derive(Clone, Eq, PartialEq, strum_macros::Display)]
454pub enum ReplyKind {
455    Empty,
456    Key,
457    Address,
458    Records,
459    Record,
460    Message,
461    Id,
462    Seq,
463    State,
464    Port,
465    ResourceValues
466}
467
468impl ReplyKind {
469    pub fn is_match(&self, reply: &Reply) -> bool {
470        match reply {
471            Reply::Empty => *self == Self::Empty,
472            Reply::Key(_) => *self == Self::Key,
473            Reply::Address(_) => *self == Self::Address,
474            Reply::Records(_) => *self == Self::Records,
475            Reply::Record(_) => *self == Self::Record,
476            Reply::Message(_) => *self == Self::Message,
477            Reply::Id(_) => *self == Self::Id,
478            Reply::Seq(_) => *self == Self::Seq,
479            Reply::State(_) => *self == Self::State,
480            Reply::Port(_) => *self == Self::Port,
481            Reply::ResourceValues(_) => *self == Self::ResourceValues
482        }
483    }
484}
485
486#[derive(Clone, Serialize, Deserialize)]
487pub enum SequenceMessage {
488    Request,
489    Response(u64),
490}
491
492#[derive(Clone, Serialize, Deserialize)]
493pub struct MessageAck {
494    pub id: Id,
495    pub kind: MessageAckKind,
496}
497
498#[derive(Clone, Serialize, Deserialize)]
499pub enum MessageAckKind {
500    Hop(StarKey),
501    Received,
502    Processing,
503}
504
505#[derive(Clone, Serialize, Deserialize)]
506pub struct SpaceMessage {
507    pub user: UserKey,
508    pub payload: SpacePayload,
509}
510
511impl SpaceMessage {
512    pub fn with_payload(&self, payload: SpacePayload) -> Self {
513        SpaceMessage {
514            user: self.user.clone(),
515            payload: payload,
516        }
517    }
518}
519
520#[derive(Clone, Serialize, Deserialize)]
521pub enum SpacePayload {
522    Reply(SpaceReply),
523    Server(ServerPayload),
524    Supervisor(SupervisorPayload),
525}
526
527#[derive(Clone, Serialize, Deserialize)]
528pub enum SupervisorPayload {
529    AppSequenceRequest(AppKey),
530}
531
532#[derive(Clone, Serialize, Deserialize)]
533pub enum ServerPayload {
534    SequenceResponse(u64),
535}
536
537#[derive(Clone, Serialize, Deserialize)]
538pub enum SpaceReply {
539    AppSequenceResponse(u64),
540}
541
542#[derive(Clone, Serialize, Deserialize)]
543pub enum AssignMessage {}
544
545#[derive(Clone, Serialize, Deserialize)]
546pub struct AppLabelRequest {
547    pub app: AppKey,
548    pub labels: Labels,
549}
550
551#[derive(Clone, Serialize, Deserialize)]
552pub enum Event {
553    App(AppEvent),
554    Actor(ActorEvent),
555    Star(StarEvent),
556}
557
558#[derive(Clone, Serialize, Deserialize)]
559pub enum ActorEvent {
560    StateChange(RawState),
561    Gathered(ActorGathered),
562    Scattered(ActorScattered),
563    Broadcast(ActorBroadcast),
564    Destroyed,
565}
566
567#[derive(Clone, Serialize, Deserialize)]
568pub enum AppEvent {
569    Created,
570    Ready,
571    Destroyed,
572}
573
574#[derive(Clone, Serialize, Deserialize)]
575pub enum StarEvent {
576    Lane(LaneEvent),
577}
578
579#[derive(Clone, Serialize, Deserialize)]
580pub struct LaneEvent {
581    pub star: StarKey,
582    pub kind: LaneEventKind,
583}
584
585#[derive(Clone, Serialize, Deserialize)]
586pub enum LaneEventKind {
587    Connect,
588    Disconnect,
589}
590
591#[derive(Clone, Serialize, Deserialize)]
592pub struct ActorGathered {
593    pub to: ResourceKey,
594}
595
596#[derive(Clone, Serialize, Deserialize)]
597pub struct ActorScattered {
598    pub from: ResourceKey,
599}
600
601#[derive(Clone, Serialize, Deserialize)]
602pub struct ActorBroadcast {
603    pub topic: String,
604    pub data: Vec<u8>,
605}
606
607#[derive(Clone, Serialize, Deserialize)]
608pub struct ActorLocationRequest {
609    pub lookup: ActorLookup,
610}
611
612#[derive(Clone, Serialize, Deserialize)]
613pub struct ActorLocationReport {
614    pub resource: ResourceKey,
615    pub location: ResourceRecord,
616}
617
618#[derive(Clone, Serialize, Deserialize)]
619pub enum ActorLookup {
620    Key(ActorKey),
621}
622
623#[derive(Clone, Serialize, Deserialize)]
624pub struct Rejection {
625    pub message: String,
626}
627
628impl fmt::Display for Diagnose {
629    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
630        let r = match self {
631            Diagnose::Ping => "Ping",
632            Diagnose::Pong => "Pong",
633        };
634        write!(f, "{}", r)
635    }
636}
637
638impl fmt::Display for StarMessagePayload {
639    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640        let r = match self {
641            StarMessagePayload::None => "None".to_string(),
642            StarMessagePayload::Space(_) => "Space".to_string(),
643            StarMessagePayload::Reply(reply) => format!("Reply({})", reply.to_string()),
644            StarMessagePayload::ResourceManager(_) => "ResourceManager".to_string(),
645            StarMessagePayload::ResourceHost(_) => "ResourceHost".to_string(),
646            StarMessagePayload::UniqueId(_) => "UniqueId".to_string(),
647            StarMessagePayload::MessagePayload(_) => "MessagePayload".to_string(),
648        };
649        write!(f, "{}", r)
650    }
651}
652
653
654impl fmt::Display for SearchTraversal {
655    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
656        let r = match self {
657            SearchTraversal::Up(up) => format!("Up({})", &up.pattern.to_string()).to_string(),
658            SearchTraversal::Down(_) => "Down".to_string(),
659        };
660        write!(f, "{}", r)
661    }
662}
663
664
665
666impl fmt::Display for ProtoFrame {
667    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668        let r = match self {
669            ProtoFrame::StarLaneProtocolVersion(version) => {
670                format!("StarLaneProtocolVersion({})", version).to_string()
671            }
672            ProtoFrame::ReportStarKey(key) => {
673                format!("ReportStarKey({})", key.to_string()).to_string()
674            }
675            ProtoFrame::GatewaySelect => format!("GatewaySelect").to_string(),
676            ProtoFrame::GatewayAssign { .. } => "GatewayAssign".to_string(),
677        };
678        write!(f, "{}", r)
679    }
680}
681
682
683