1use std::fmt;
2use std::fmt::{Debug, Formatter};
3
4use semver::SemVerError;
5use serde::{Deserialize, Serialize};
6use tokio::sync::{broadcast, mpsc, oneshot};
7use tokio::time::error::Elapsed;
8
9use starlane_resources::{AssignResourceStateSrc, ResourceAssign, ResourceCreate, ResourceIdentifier, ResourceSelector, ResourceStatus, ResourceStub, ResourceAddress, Labels};
10use starlane_resources::data::{BinSrc, DataSet};
11use starlane_resources::message::{Fail, Message, MessageId, MessageReply, RawState, ResourceRequestMessage, ResourceResponseMessage, ResourcePortMessage, ResourcePortReply};
12
13use crate::error::Error;
14use crate::id::Id;
15use crate::logger::Flags;
16use crate::message::{MessageExpect, MessageUpdate, ProtoStarMessage};
17use crate::message::resource::ActorMessage;
18use crate::star::{Star, StarCommand, StarInfo, StarKey, StarKind, StarNotify, StarSubGraphKey};
19use crate::watch::{Notification, Watch, WatchKey};
20use crate::resource::{ResourceId, ResourceRegistration, ResourceRecord, ResourceType, ResourceKey, ResourceSliceStatus, UserKey, AppKey, ActorKey};
21use starlane_resources::property::ResourceValues;
22
23#[derive(Debug, Clone, Serialize, Deserialize,strum_macros::Display)]
24pub enum Frame {
25 Proto(ProtoFrame),
26 Diagnose(Diagnose),
27 SearchTraversal(SearchTraversal),
28 StarMessage(StarMessage),
29 Watch(WatchFrame),
30 Close,
31}
32
33
34#[derive(Debug, Clone, Serialize, Deserialize,strum_macros::Display)]
35pub enum WatchFrame {
36 Watch(Watch),
37 UnWatch(WatchKey),
38 Notify(Notification)
39}
40
41
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub enum SearchTraversal {
45 Up(SearchWindUp),
46 Down(SearchWindDown),
47}
48
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub enum ProtoFrame {
51 StarLaneProtocolVersion(i32),
52 ReportStarKey(StarKey),
53 GatewaySelect,
54 GatewayAssign(Vec<StarSubGraphKey>),
55}
56
57
58
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct WatchInfo {
61 pub id: Id,
62 pub actor: ActorKey,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct StarMessageAck {
67 pub from: StarKey,
68 pub to: StarKey,
69 pub id: Id,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
73pub enum Diagnose {
74 Ping,
75 Pong,
76}
77
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct SearchWindUp {
80 pub from: StarKey,
81 pub pattern: StarPattern,
82 pub hops: Vec<StarKey>,
83 pub transactions: Vec<u64>,
84 pub max_hops: usize,
85 pub action: TraversalAction,
86}
87
88impl SearchWindUp {
89 pub fn new(from: StarKey, pattern: StarPattern, action: TraversalAction) -> Self {
90 SearchWindUp {
91 from: from,
92 pattern: pattern,
93 action: action,
94 hops: vec![],
95 transactions: vec![],
96 max_hops: 255,
97 }
98 }
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub enum TraversalAction {
103 SearchHits,
104 Flags(Flags),
105}
106
107impl TraversalAction {
108 pub fn update(
109 &self,
110 mut new_hits: Vec<SearchHit>,
111 result: SearchResults,
112 ) -> Result<SearchResults, Error> {
113 match self {
114 TraversalAction::SearchHits => {
115 if let SearchResults::None = result {
116 let mut hits = vec![];
117 hits.append(&mut new_hits);
118 Ok(SearchResults::Hits(hits))
119 } else if let SearchResults::Hits(mut old_hits) = result {
120 let mut hits = vec![];
121 hits.append(&mut old_hits);
122 hits.append(&mut new_hits);
123 Ok(SearchResults::Hits(hits))
124 } else {
125 Err(
126 "when action is SearchHIts, expecting WindResult::Hits or WindResult::None"
127 .into(),
128 )
129 }
130 }
131 TraversalAction::Flags(_flags) => Ok(SearchResults::None),
132 }
133 }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub enum SearchResults {
138 None,
139 Hits(Vec<SearchHit>),
140}
141
142impl SearchWindUp {
143 pub fn inc(&mut self, hop: StarKey, transaction: u64) {
144 self.hops.push(hop);
145 self.transactions.push(transaction);
146 }
147}
148
149#[derive(Debug, Clone, Serialize, Deserialize,strum_macros::Display)]
150pub enum StarPattern {
151 Any,
152 None,
153 StarKey(StarKey),
154 StarKind(StarKind),
155 StarKeySubgraph(Vec<StarSubGraphKey>)
156}
157
158impl StarPattern {
159 pub fn info_match(&self, info: &StarInfo) -> bool {
160 match self {
161 StarPattern::Any => true,
162 StarPattern::None => false,
163 StarPattern::StarKey(_) => {
164 self.key_match(&info.key)
165 }
166 StarPattern::StarKind(pattern) => *pattern == info.kind,
167 StarPattern::StarKeySubgraph(_) => {
168 self.key_match(&info.key)
169 }
170 }
171 }
172
173 pub fn key_match(&self, star: &StarKey) -> bool {
174 match self {
175 StarPattern::Any => true,
176 StarPattern::None => false,
177 StarPattern::StarKey(pattern) => *star == *pattern,
178 StarPattern::StarKind(_) => false,
179 StarPattern::StarKeySubgraph(pattern) => {
180 *pattern == star.subgraph
182 }
183 }
184 }
185
186
187 pub fn is_single_match(&self) -> bool {
188 match self {
189 StarPattern::StarKey(_) => true,
190 StarPattern::StarKind(_) => false,
191 StarPattern::Any => false,
192 StarPattern::None => false,
193 StarPattern::StarKeySubgraph(_) => false
194 }
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
199pub struct SearchWindDown {
200 pub missed: Option<StarKey>,
201 pub result: SearchResults,
202 pub wind_up: SearchWindUp,
203 pub transactions: Vec<u64>,
204 pub hops: Vec<StarKey>,
205}
206
207impl SearchWindDown {
208 pub fn pop(&mut self) {
209 self.transactions.pop();
210 self.hops.pop();
211 }
212}
213
214#[derive(Debug, Clone, Serialize, Deserialize, Hash, Eq, PartialEq)]
215pub struct SearchHit {
216 pub star: StarKey,
217 pub hops: usize,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub struct StarMessage {
222 pub from: StarKey,
223 pub to: StarKey,
224 pub id: MessageId,
225 pub payload: StarMessagePayload,
226 pub reply_to: Option<MessageId>,
227 pub trace: bool,
228 pub log: bool,
229}
230
231impl StarMessage {
232 pub fn new(id: MessageId, from: StarKey, to: StarKey, payload: StarMessagePayload) -> Self {
233 StarMessage {
234 id: id,
235 from: from,
236 to: to,
237 payload: payload,
238 reply_to: Option::None,
239 trace: false,
240 log: true,
241 }
242 }
243
244 pub fn to_central(id: MessageId, from: StarKey, payload: StarMessagePayload) -> Self {
245 StarMessage {
246 id: id,
247 from: from,
248 to: StarKey::central(),
249 payload: payload,
250 reply_to: Option::None,
251 trace: false,
252 log: false,
253 }
254 }
255
256 pub fn forward(&self, _to: &StarKey) -> ProtoStarMessage {
257 let mut proto = ProtoStarMessage::new();
258 proto.to = self.to.clone().into();
259 proto.payload = self.payload.clone();
260 proto
261 }
262
263 pub fn fail(&self, fail: Fail) -> ProtoStarMessage {
289 self.reply(StarMessagePayload::Reply(SimpleReply::Fail(fail)))
290 }
291
292 pub fn ok(&self, reply: Reply) -> ProtoStarMessage {
293 self.reply(StarMessagePayload::Reply(SimpleReply::Ok(reply)))
294 }
295
296 pub fn reply(&self, payload: StarMessagePayload) -> ProtoStarMessage {
297 let mut proto = ProtoStarMessage::new();
298 proto.to = self.from.clone().into();
299 proto.reply_to = Option::Some(self.id.clone());
300 proto.payload = payload;
301 proto
302 }
303
304 pub fn reply_err(&self, err: String) -> ProtoStarMessage {
305 let mut proto = ProtoStarMessage::new();
306 proto.to = self.from.clone().into();
307 proto.reply_to = Option::Some(self.id.clone());
308 proto.payload = StarMessagePayload::Reply(SimpleReply::Fail(Fail::Error(err)));
309 proto
310 }
311
312 pub fn reply_ok(&self, reply: Reply) -> ProtoStarMessage {
313 let mut proto = ProtoStarMessage::new();
314 proto.to = self.from.clone().into();
315 proto.reply_to = Option::Some(self.id.clone());
316 proto.payload = StarMessagePayload::Reply(SimpleReply::Ok(reply));
317 proto
318 }
319
320 pub fn resubmit(
321 self,
322 tx: broadcast::Sender<MessageUpdate>,
323 rx: broadcast::Receiver<MessageUpdate>,
324 ) -> ProtoStarMessage {
325 let mut proto = ProtoStarMessage::with_txrx(tx, rx);
326 proto.to = self.from.clone().into();
327 proto.reply_to = Option::Some(self.id.clone());
328 proto.payload = self.payload;
329 proto
330 }
331}
332
333#[derive(Clone, Serialize, Deserialize)]
334pub enum StarMessagePayload {
335 None,
336 MessagePayload(MessagePayload),
337 ResourceManager(RegistryAction),
338 ResourceHost(ResourceHostAction),
339 Space(SpaceMessage),
340 Reply(SimpleReply),
341 UniqueId(ResourceId),
342}
343
344impl Debug for StarMessagePayload {
345 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
346 f.write_str(match self {
347 StarMessagePayload::None => "None",
348 StarMessagePayload::MessagePayload(_) => "MessagePayload",
349 StarMessagePayload::ResourceManager(_) => "ResourceManager",
350 StarMessagePayload::ResourceHost(_) => "ResourceHost",
351 StarMessagePayload::Space(_) => "Space",
352 StarMessagePayload::Reply(_) => "Reply",
353 StarMessagePayload::UniqueId(_) => "UniqueId",
354 });
355 Ok(())
356 }
357}
358
359#[derive(Clone, Serialize, Deserialize)]
360pub enum MessagePayload {
361 Request(Message<ResourceRequestMessage>),
362 Response(MessageReply<ResourceResponseMessage>),
363 PortRequest(Message<ResourcePortMessage>),
364}
365
366#[derive(Clone, Serialize, Deserialize)]
367pub enum ResourceHostAction {
368 Assign(ResourceAssign<AssignResourceStateSrc<DataSet<BinSrc>>>),
370 Init(ResourceKey),
371}
372
373#[derive(Debug, Clone, Serialize, Deserialize)]
374pub enum RegistryAction {
375 Register(ResourceRegistration),
376 Location(ResourceRecord),
377 Find(ResourceIdentifier),
378 Status(ResourceStatusReport),
379 UniqueResourceId {
380 parent: ResourceIdentifier,
381 child_type: ResourceType,
382 },
383}
384
385impl ToString for RegistryAction {
386 fn to_string(&self) -> String {
387 match self {
388 RegistryAction::Register(_) => "Register".to_string(),
389 RegistryAction::Location(_) => "Location".to_string(),
390 RegistryAction::Find(_) => "Find".to_string(),
391 RegistryAction::Status(_) => "Status".to_string(),
392 RegistryAction::UniqueResourceId { .. } => "UniqueResourceId".to_string(),
393 }
394 }
395}
396
397#[derive(Debug, Clone, Serialize, Deserialize)]
398pub struct ResourceStatusReport {
399 pub key: ResourceKey,
400 pub status: ResourceStatus,
401}
402
403#[derive(Clone, Serialize, Deserialize)]
404pub struct ResourceSliceStatusReport {
405 pub key: ResourceKey,
406 pub status: ResourceSliceStatus,
407}
408
409#[derive(Clone, Serialize, Deserialize)]
410pub enum SimpleReply {
411 Ok(Reply),
412 Fail(Fail),
413 Ack(MessageAck),
414}
415
416impl ToString for SimpleReply {
417 fn to_string(&self) -> String {
418 match self {
419 SimpleReply::Ok(ok) => format!("Ok({})", ok.to_string()),
420 SimpleReply::Fail(fail) => format!("Fail({})", fail.to_string()),
421 SimpleReply::Ack(_ack) => "Ack".to_string(),
422 }
423 }
424}
425
426impl StarMessagePayload {
427 pub fn is_ack(&self) -> bool {
428 match self {
429 StarMessagePayload::Reply(reply) => match reply {
430 SimpleReply::Ack(_) => true,
431 _ => false,
432 },
433 _ => false,
434 }
435 }
436}
437
438#[derive(Clone, Serialize, Deserialize, strum_macros::Display)]
439pub enum Reply {
440 Empty,
441 Key(ResourceKey),
442 Address(ResourceAddress),
443 Records(Vec<ResourceRecord>),
444 Record(ResourceRecord),
445 Message(MessageReply<ResourceResponseMessage>),
446 Id(ResourceId),
447 State(DataSet<BinSrc>),
448 ResourceValues(ResourceValues<ResourceStub>),
449 Seq(u64),
450 Port(DataSet<BinSrc>)
451}
452
453#[derive(Clone, Eq, PartialEq, strum_macros::Display)]
454pub enum ReplyKind {
455 Empty,
456 Key,
457 Address,
458 Records,
459 Record,
460 Message,
461 Id,
462 Seq,
463 State,
464 Port,
465 ResourceValues
466}
467
468impl ReplyKind {
469 pub fn is_match(&self, reply: &Reply) -> bool {
470 match reply {
471 Reply::Empty => *self == Self::Empty,
472 Reply::Key(_) => *self == Self::Key,
473 Reply::Address(_) => *self == Self::Address,
474 Reply::Records(_) => *self == Self::Records,
475 Reply::Record(_) => *self == Self::Record,
476 Reply::Message(_) => *self == Self::Message,
477 Reply::Id(_) => *self == Self::Id,
478 Reply::Seq(_) => *self == Self::Seq,
479 Reply::State(_) => *self == Self::State,
480 Reply::Port(_) => *self == Self::Port,
481 Reply::ResourceValues(_) => *self == Self::ResourceValues
482 }
483 }
484}
485
486#[derive(Clone, Serialize, Deserialize)]
487pub enum SequenceMessage {
488 Request,
489 Response(u64),
490}
491
492#[derive(Clone, Serialize, Deserialize)]
493pub struct MessageAck {
494 pub id: Id,
495 pub kind: MessageAckKind,
496}
497
498#[derive(Clone, Serialize, Deserialize)]
499pub enum MessageAckKind {
500 Hop(StarKey),
501 Received,
502 Processing,
503}
504
505#[derive(Clone, Serialize, Deserialize)]
506pub struct SpaceMessage {
507 pub user: UserKey,
508 pub payload: SpacePayload,
509}
510
511impl SpaceMessage {
512 pub fn with_payload(&self, payload: SpacePayload) -> Self {
513 SpaceMessage {
514 user: self.user.clone(),
515 payload: payload,
516 }
517 }
518}
519
520#[derive(Clone, Serialize, Deserialize)]
521pub enum SpacePayload {
522 Reply(SpaceReply),
523 Server(ServerPayload),
524 Supervisor(SupervisorPayload),
525}
526
527#[derive(Clone, Serialize, Deserialize)]
528pub enum SupervisorPayload {
529 AppSequenceRequest(AppKey),
530}
531
532#[derive(Clone, Serialize, Deserialize)]
533pub enum ServerPayload {
534 SequenceResponse(u64),
535}
536
537#[derive(Clone, Serialize, Deserialize)]
538pub enum SpaceReply {
539 AppSequenceResponse(u64),
540}
541
542#[derive(Clone, Serialize, Deserialize)]
543pub enum AssignMessage {}
544
545#[derive(Clone, Serialize, Deserialize)]
546pub struct AppLabelRequest {
547 pub app: AppKey,
548 pub labels: Labels,
549}
550
551#[derive(Clone, Serialize, Deserialize)]
552pub enum Event {
553 App(AppEvent),
554 Actor(ActorEvent),
555 Star(StarEvent),
556}
557
558#[derive(Clone, Serialize, Deserialize)]
559pub enum ActorEvent {
560 StateChange(RawState),
561 Gathered(ActorGathered),
562 Scattered(ActorScattered),
563 Broadcast(ActorBroadcast),
564 Destroyed,
565}
566
567#[derive(Clone, Serialize, Deserialize)]
568pub enum AppEvent {
569 Created,
570 Ready,
571 Destroyed,
572}
573
574#[derive(Clone, Serialize, Deserialize)]
575pub enum StarEvent {
576 Lane(LaneEvent),
577}
578
579#[derive(Clone, Serialize, Deserialize)]
580pub struct LaneEvent {
581 pub star: StarKey,
582 pub kind: LaneEventKind,
583}
584
585#[derive(Clone, Serialize, Deserialize)]
586pub enum LaneEventKind {
587 Connect,
588 Disconnect,
589}
590
591#[derive(Clone, Serialize, Deserialize)]
592pub struct ActorGathered {
593 pub to: ResourceKey,
594}
595
596#[derive(Clone, Serialize, Deserialize)]
597pub struct ActorScattered {
598 pub from: ResourceKey,
599}
600
601#[derive(Clone, Serialize, Deserialize)]
602pub struct ActorBroadcast {
603 pub topic: String,
604 pub data: Vec<u8>,
605}
606
607#[derive(Clone, Serialize, Deserialize)]
608pub struct ActorLocationRequest {
609 pub lookup: ActorLookup,
610}
611
612#[derive(Clone, Serialize, Deserialize)]
613pub struct ActorLocationReport {
614 pub resource: ResourceKey,
615 pub location: ResourceRecord,
616}
617
618#[derive(Clone, Serialize, Deserialize)]
619pub enum ActorLookup {
620 Key(ActorKey),
621}
622
623#[derive(Clone, Serialize, Deserialize)]
624pub struct Rejection {
625 pub message: String,
626}
627
628impl fmt::Display for Diagnose {
629 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
630 let r = match self {
631 Diagnose::Ping => "Ping",
632 Diagnose::Pong => "Pong",
633 };
634 write!(f, "{}", r)
635 }
636}
637
638impl fmt::Display for StarMessagePayload {
639 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
640 let r = match self {
641 StarMessagePayload::None => "None".to_string(),
642 StarMessagePayload::Space(_) => "Space".to_string(),
643 StarMessagePayload::Reply(reply) => format!("Reply({})", reply.to_string()),
644 StarMessagePayload::ResourceManager(_) => "ResourceManager".to_string(),
645 StarMessagePayload::ResourceHost(_) => "ResourceHost".to_string(),
646 StarMessagePayload::UniqueId(_) => "UniqueId".to_string(),
647 StarMessagePayload::MessagePayload(_) => "MessagePayload".to_string(),
648 };
649 write!(f, "{}", r)
650 }
651}
652
653
654impl fmt::Display for SearchTraversal {
655 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
656 let r = match self {
657 SearchTraversal::Up(up) => format!("Up({})", &up.pattern.to_string()).to_string(),
658 SearchTraversal::Down(_) => "Down".to_string(),
659 };
660 write!(f, "{}", r)
661 }
662}
663
664
665
666impl fmt::Display for ProtoFrame {
667 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
668 let r = match self {
669 ProtoFrame::StarLaneProtocolVersion(version) => {
670 format!("StarLaneProtocolVersion({})", version).to_string()
671 }
672 ProtoFrame::ReportStarKey(key) => {
673 format!("ReportStarKey({})", key.to_string()).to_string()
674 }
675 ProtoFrame::GatewaySelect => format!("GatewaySelect").to_string(),
676 ProtoFrame::GatewayAssign { .. } => "GatewayAssign".to_string(),
677 };
678 write!(f, "{}", r)
679 }
680}
681
682
683