starlane_core/
proto.rs

1use std::cell::Cell;
2use std::collections::{HashMap, HashSet};
3use std::convert::TryInto;
4use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
5use std::sync::Arc;
6
7use futures::future::select_all;
8use futures::prelude::*;
9use futures::FutureExt;
10use tokio::sync::mpsc::{Receiver, Sender};
11use tokio::sync::{broadcast, mpsc};
12use tokio::time::{Duration, Instant};
13
14use crate::cache::ProtoArtifactCachesFactory;
15use crate::constellation::ConstellationStatus;
16use crate::error::Error;
17use crate::file_access::FileAccess;
18use crate::frame::{
19    Frame, ProtoFrame, SequenceMessage, StarMessage, StarMessagePayload, StarPattern, SearchWindDown,
20    SearchHit, SearchWindUp,
21};
22use crate::lane::{
23    ConnectorController, LaneCommand, LaneEnd, LaneIndex, LaneMeta, LaneWrapper,
24    ProtoLaneEnd, TunnelConnector, TunnelIn, TunnelOut, TunnelOutState,
25    STARLANE_PROTOCOL_VERSION,
26};
27use crate::logger::{Flag, Flags, Log, Logger, ProtoStarLog, ProtoStarLogPayload, StarFlag};
28use crate::permissions::AuthTokenSource;
29use crate::star::core::message::MessagingEndpointComponent;
30use crate::star::shell::lanes::{LaneMuxerApi, LaneMuxer, LanePattern};
31use crate::star::shell::search::{StarSearchApi, StarSearchComponent, StarSearchTransaction, ShortestPathStarKey};
32use crate::star::shell::message::{MessagingApi, MessagingComponent};
33use crate::star::shell::pledge::StarWranglerBacking;
34use crate::star::shell::router::{RouterApi, RouterComponent, RouterCall};
35use crate::star::surface::{SurfaceApi, SurfaceCall, SurfaceComponent};
36use crate::star::variant::{VariantApi, start_variant};
37use crate::star::{
38    ConstellationBroadcast, FrameHold, FrameTimeoutInner, Persistence, ResourceRegistryBacking,
39    ResourceRegistryBackingSqLite, Star, StarCommand, StarController,
40    StarInfo, StarKernel, StarKey, StarKind, StarSkel,
41};
42use crate::starlane::StarlaneMachine;
43use crate::template::StarKeyConstellationIndex;
44use crate::star::shell::locator::{ResourceLocatorApi, ResourceLocatorComponent};
45use crate::star::shell::golden::{GoldenPathApi, GoldenPathComponent};
46use crate::star::shell::watch::{WatchApi, WatchComponent};
47
48
49pub struct ProtoStar {
50    star_key: ProtoStarKey,
51    sequence: Arc<AtomicU64>,
52    kind: StarKind,
53    star_tx: mpsc::Sender<StarCommand>,
54    star_rx: mpsc::Receiver<StarCommand>,
55    surface_api: SurfaceApi,
56    surface_rx: mpsc::Receiver<SurfaceCall>,
57    lanes: HashMap<StarKey, LaneWrapper>,
58    proto_lanes: Vec<LaneWrapper>,
59    connector_ctrls: Vec<ConnectorController>,
60    //  star_core_ext_factory: Arc<dyn StarCoreExtFactory>,
61    logger: Logger,
62    frame_hold: FrameHold,
63    data_access: FileAccess,
64    proto_constellation_broadcast: Cell<Option<broadcast::Receiver<ConstellationBroadcast>>>,
65    constellation_status: ConstellationStatus,
66    flags: Flags,
67    tracker: ProtoTracker,
68    machine: StarlaneMachine,
69    lane_muxer_api: LaneMuxerApi,
70    router_tx: mpsc::Sender<RouterCall>,
71    router_booster_rx: RouterCallBooster
72}
73
74impl ProtoStar {
75    pub fn new(
76        key: ProtoStarKey,
77        kind: StarKind,
78        star_tx: Sender<StarCommand>,
79        star_rx: Receiver<StarCommand>,
80        surface_api: SurfaceApi,
81        surface_rx: mpsc::Receiver<SurfaceCall>,
82        data_access: FileAccess,
83        proto_constellation_broadcast: broadcast::Receiver<ConstellationBroadcast>,
84        flags: Flags,
85        logger: Logger,
86        machine: StarlaneMachine,
87    ) -> (Self, StarController) {
88        let (router_tx,router_rx) = mpsc::channel(1024);
89        let router_booster_rx = RouterCallBooster { router_rx };
90        let lane_muxer_api = LaneMuxer::start(router_tx.clone());
91        (
92
93            ProtoStar {
94                star_key: key,
95                sequence: Arc::new(AtomicU64::new(0)),
96                kind,
97                star_tx: star_tx.clone(),
98                star_rx,
99                lanes: HashMap::new(),
100                proto_lanes: vec![],
101                connector_ctrls: vec![],
102                logger: logger,
103                frame_hold: FrameHold::new(),
104                data_access: data_access,
105                proto_constellation_broadcast: Cell::new(Option::Some(
106                    proto_constellation_broadcast,
107                )),
108                tracker: ProtoTracker::new(),
109                flags: flags,
110                constellation_status: ConstellationStatus::Unknown,
111                machine: machine,
112                surface_api: surface_api.clone(),
113                surface_rx,
114                lane_muxer_api,
115                router_tx,
116                router_booster_rx
117            },
118            StarController {
119                star_tx,
120                surface_api,
121            },
122        )
123    }
124
125    pub async fn evolve(mut self) -> Result<Star, Error> {
126        let mut proto_constellation_broadcast = self
127            .proto_constellation_broadcast
128            .replace(Option::None)
129            .ok_or("expected proto_constellation_broadcast to be Option::Some()")?;
130
131        let star_tx = self.star_tx.clone();
132        tokio::spawn(async move {
133            while let Result::Ok(broadcast) = proto_constellation_broadcast.recv().await {
134                star_tx
135                    .send(StarCommand::ConstellationBroadcast(broadcast))
136                    .await;
137            }
138        });
139
140        loop {
141
142            let mut futures = vec![];
143            futures.push(self.star_rx.recv().boxed() );
144            futures.push(self.router_booster_rx.boost().boxed());
145            let (call,_,_) = select_all(futures).await;
146
147            if let Some(call) = call{
148                match call {
149                    StarCommand::GetStarInfo(tx) => match &self.star_key {
150                        ProtoStarKey::Key(key) => {
151                            tx.send(Option::Some(StarInfo {
152                                key: key.clone(),
153                                kind: self.kind.clone(),
154                            }));
155                        }
156                        ProtoStarKey::RequestSubKeyExpansion(_) => {
157                            tx.send(Option::None);
158                        }
159                    },
160                    StarCommand::ConstellationBroadcast(broadcast) => match broadcast {
161                        ConstellationBroadcast::Status(constellation_status) => {
162                            self.constellation_status = constellation_status;
163                            self.check_ready();
164                        }
165                    },
166                    StarCommand::InvokeProtoStarEvolution => {
167                        let star_key = match self.star_key
168                        {
169                            ProtoStarKey::Key(star_key) => star_key,
170                            _ => panic!("proto star not ready for proto star evolution because it does not have a star_key yet assigned")
171                        };
172
173                        let info = StarInfo {
174                            key: star_key,
175                            kind: self.kind.clone(),
176                        };
177
178                        let (core_messaging_endpoint_tx, core_messaging_endpoint_rx) =
179                            mpsc::channel(1024);
180                        let (resource_locator_tx, resource_locator_rx) = mpsc::channel(1024);
181                        let (star_locator_tx, star_locator_rx) = mpsc::channel(1024);
182                        let (messaging_tx, messaging_rx) = mpsc::channel(1024);
183                        let (golden_path_tx, golden_path_rx) = mpsc::channel(1024);
184                        let (variant_tx, variant_rx) = mpsc::channel(1024);
185                        let (watch_tx, watch_rx) = mpsc::channel(1024);
186
187                        let resource_locator_api = ResourceLocatorApi::new(resource_locator_tx);
188                        let star_search_api = StarSearchApi::new(star_locator_tx);
189                        let router_api = RouterApi::new(self.router_tx);
190                        let messaging_api = MessagingApi::new(messaging_tx);
191                        let golden_path_api = GoldenPathApi::new(golden_path_tx);
192                        let variant_api = VariantApi::new(variant_tx);
193                        let watch_api = WatchApi::new(watch_tx);
194
195
196                        let data_access = self
197                            .data_access
198                            .with_path(format!("stars/{}", info.key.to_string()))?;
199
200                        let resource_registry: Option<Arc<dyn ResourceRegistryBacking>> =
201                            if info.kind.is_resource_manager() {
202                                Option::Some(Arc::new(
203                                    ResourceRegistryBackingSqLite::new(
204                                        info.clone(),
205                                        data_access.path(),
206                                    )
207                                    .await?,
208                                ))
209                            } else {
210                                Option::None
211                            };
212
213                        let star_handler: Option<StarWranglerBacking> =
214                            if !info.kind.conscripts().is_empty() {
215                                Option::Some(StarWranglerBacking::new(self.star_tx.clone()).await)
216                            } else {
217                                Option::None
218                            };
219
220                        let skel = StarSkel {
221                            info: info,
222                            sequence: self.sequence.clone(),
223                            star_tx: self.star_tx.clone(),
224                            core_messaging_endpoint_tx: core_messaging_endpoint_tx.clone(),
225                            logger: self.logger.clone(),
226                            flags: self.flags.clone(),
227                            registry: resource_registry,
228                            star_handler: star_handler,
229                            persistence: Persistence::Memory,
230                            data_access: data_access,
231                            machine: self.machine.clone(),
232                            surface_api: self.surface_api,
233                            resource_locator_api,
234                            star_search_api,
235                            router_api,
236                            messaging_api,
237                            lane_muxer_api: self.lane_muxer_api,
238                            golden_path_api,
239                            variant_api,
240                            watch_api
241                        };
242
243                        start_variant(skel.clone(), variant_rx );
244
245                        MessagingEndpointComponent::start(skel.clone(), core_messaging_endpoint_rx);
246                        ResourceLocatorComponent::start(skel.clone(), resource_locator_rx);
247                        StarSearchComponent::start(skel.clone(), star_locator_rx);
248                        RouterComponent::start(skel.clone(), self.router_booster_rx.router_rx);
249                        MessagingComponent::start(skel.clone(), messaging_rx);
250                        SurfaceComponent::start(skel.clone(), self.surface_rx);
251                        GoldenPathComponent::start(skel.clone(), golden_path_rx);
252                        WatchComponent::start(skel.clone(), watch_rx);
253
254                        return Ok(Star::from_proto(
255                            skel,
256                            self.star_rx,
257                            core_messaging_endpoint_tx,
258                            self.lanes,
259                            self.proto_lanes,
260                            self.connector_ctrls,
261                            self.frame_hold,
262                        )
263                        .await);
264                    }
265                    StarCommand::AddProtoLaneEndpoint(lane) => {
266                        match &self.star_key {
267                            ProtoStarKey::Key(star_key) => {
268                                lane.outgoing
269                                    .out_tx
270                                    .send(LaneCommand::Frame(Frame::Proto(
271                                        ProtoFrame::ReportStarKey(star_key.clone()),
272                                    )))
273                                    .await?;
274                            }
275                            ProtoStarKey::RequestSubKeyExpansion(_index) => {
276                                lane.outgoing
277                                    .out_tx
278                                    .send(LaneCommand::Frame(Frame::Proto(
279                                        ProtoFrame::GatewaySelect,
280                                    )))
281                                    .await?;
282                            }
283                        }
284
285                        self.lane_muxer_api.add_proto_lane(lane, StarPattern::Any );
286
287
288                    }
289                    StarCommand::Frame(Frame::Proto( ProtoFrame::GatewayAssign(subgraph))) => {
290                        if let ProtoStarKey::RequestSubKeyExpansion(index) = self.star_key {
291                            let star_key = StarKey::new_with_subgraph(subgraph.clone(), index);
292                            self.star_key = ProtoStarKey::Key(star_key.clone());
293                            self.lane_muxer_api.broadcast(Frame::Proto(
294                                ProtoFrame::ReportStarKey(star_key.clone()),
295                            ), LanePattern::Any );
296
297                            self.lane_muxer_api.broadcast(Frame::Proto(
298                                ProtoFrame::GatewayAssign(subgraph),
299                            ), LanePattern::Protos );
300                            self.check_ready();
301                        } else {
302                            eprintln!("not expecting a GatewayAssign for this ProtoStarKey which is already assigned.")
303                        }
304                    }
305                    StarCommand::AddConnectorController(connector_ctrl) => {
306                        self.connector_ctrls.push(connector_ctrl);
307                    }
308                    StarCommand::AddLogger(_logger) => {
309                        //                        self.logger =
310                    }
311                    _ => {
312                        eprintln!("not implemented");
313                    }
314                }
315            } else {
316                //            return Err("command_rx has been disconnected".into());
317            }
318        }
319    }
320
321    fn check_ready(&mut self) {
322        if self.constellation_status == ConstellationStatus::Assembled && self.star_key.is_some() {
323            let star_tx = self.star_tx.clone();
324            tokio::spawn(async move {
325                star_tx.send(StarCommand::InvokeProtoStarEvolution).await;
326            });
327        }
328    }
329
330
331}
332
333pub struct ProtoStarEvolution {
334    pub star: StarKey,
335    pub controller: StarController,
336}
337
338pub struct ProtoStarController {
339    command_tx: Sender<StarCommand>,
340}
341
342#[derive(Clone)]
343pub enum ProtoStarKernel {
344    Central,
345    Mesh,
346    Supervisor,
347    Server,
348    Gateway,
349}
350
351impl ProtoStarKernel {
352    fn evolve(&self) -> Result<Box<dyn StarKernel>, Error> {
353        Ok(Box::new(PlaceholderKernel::new()))
354    }
355}
356
357pub struct PlaceholderKernel {}
358
359impl PlaceholderKernel {
360    pub fn new() -> Self {
361        PlaceholderKernel {}
362    }
363}
364
365impl StarKernel for PlaceholderKernel {}
366
367pub struct ProtoTunnel {
368    pub tx: Sender<Frame>,
369    pub rx: Receiver<Frame>,
370}
371
372impl ProtoTunnel {
373    pub async fn evolve(mut self) -> Result<(TunnelOut, TunnelIn), Error> {
374        self.tx
375            .send(Frame::Proto(ProtoFrame::StarLaneProtocolVersion(
376                STARLANE_PROTOCOL_VERSION,
377            )))
378            .await;
379
380        // first we confirm that the version is as expected
381        if let Option::Some(Frame::Proto(recv)) = self.rx.recv().await {
382            match recv {
383                ProtoFrame::StarLaneProtocolVersion(version)
384                    if version == STARLANE_PROTOCOL_VERSION =>
385                {
386                    // do nothing... we move onto the next step
387                    return Ok((
388                        TunnelOut {
389                            //                            remote_star: remote_star_key.clone(),
390                            tx: self.tx,
391                        },
392                        TunnelIn {
393                            //                            remote_star: remote_star_key.clone(),
394                            rx: self.rx,
395                        },
396                    ));
397                }
398                ProtoFrame::StarLaneProtocolVersion(version) => {
399                    return Err(format!("wrong version: {}", version).into());
400                }
401                gram => {
402                    return Err(format!("unexpected star gram: {} (expected to receive StarLaneProtocolVersion first)", gram).into());
403                }
404            }
405        } else {
406            return Err("disconnected".into());
407        }
408
409        if let Option::Some(Frame::Proto(recv)) = self.rx.recv().await {
410            match recv {
411                ProtoFrame::ReportStarKey(_remote_star_key) => {
412                    return Ok((
413                        TunnelOut {
414                            //                            remote_star: remote_star_key.clone(),
415                            tx: self.tx,
416                        },
417                        TunnelIn {
418                            //                            remote_star: remote_star_key.clone(),
419                            rx: self.rx,
420                        },
421                    ));
422                }
423                frame => {
424                    return Err(format!(
425                        "unexpected star gram: {} (expected to receive ReportStarKey next)",
426                        frame
427                    )
428                    .into());
429                }
430            };
431        } else {
432            return Err("disconnected!".into());
433        }
434    }
435}
436
437pub fn local_tunnels() -> (ProtoTunnel, ProtoTunnel) {
438    let (atx, arx) = mpsc::channel::<Frame>(32);
439    let (btx, brx) = mpsc::channel::<Frame>(32);
440
441    (
442        ProtoTunnel { tx: atx, rx: brx },
443        ProtoTunnel { tx: btx, rx: arx },
444    )
445}
446
447struct ProtoTrackerCase {
448    frame: Frame,
449    instant: Instant,
450    expect: fn(&Frame) -> bool,
451    retries: usize,
452}
453
454impl ProtoTrackerCase {
455    pub fn reset(&mut self) {
456        self.instant = Instant::now();
457    }
458}
459
460struct ProtoTracker {
461    case: Option<ProtoTrackerCase>,
462}
463
464impl ProtoTracker {
465    pub fn new() -> Self {
466        ProtoTracker { case: Option::None }
467    }
468
469    pub fn track(&mut self, frame: Frame, expect: fn(&Frame) -> bool) {
470        self.case = Option::Some(ProtoTrackerCase {
471            frame: frame,
472            instant: Instant::now(),
473            expect: expect,
474            retries: 0,
475        });
476    }
477
478    pub fn process(&mut self, frame: &Frame) {
479        if let Option::Some(case) = &self.case {
480            if (case.expect)(frame) {
481                self.case = Option::None;
482            }
483        }
484    }
485
486    pub fn has_expectation(&self) -> bool {
487        return self.case.is_some();
488    }
489
490    pub async fn check(&mut self) -> Option<StarCommand> {
491        if let Option::Some(case) = &mut self.case {
492            let now = Instant::now();
493            let seconds = 5 - (now.duration_since(case.instant).as_secs() as i64);
494            if seconds > 0 {
495                let duration = Duration::from_secs(seconds as u64);
496                tokio::time::sleep(duration).await;
497            }
498
499            case.retries = case.retries + 1;
500
501            case.reset();
502
503            Option::Some(StarCommand::FrameTimeout(FrameTimeoutInner {
504                frame: case.frame.clone(),
505                retries: case.retries,
506            }))
507        } else {
508            Option::None
509        }
510    }
511}
512
513pub enum LaneToCentralState {
514    Found(LaneToCentral),
515    None,
516}
517
518pub struct LaneToCentral {
519    remote_star: StarKey,
520    hops: usize,
521}
522
523#[derive(Clone)]
524pub enum ProtoStarKey {
525    Key(StarKey),
526    RequestSubKeyExpansion(StarKeyConstellationIndex),
527}
528
529impl ProtoStarKey {
530    pub fn is_some(&self) -> bool {
531        match self {
532            ProtoStarKey::Key(_) => true,
533            ProtoStarKey::RequestSubKeyExpansion(_) => false,
534        }
535    }
536}
537
538struct RouterCallBooster {
539    router_rx: mpsc::Receiver<RouterCall>
540}
541
542impl RouterCallBooster {
543    pub async fn boost(&mut self) -> Option<StarCommand> {
544        loop {
545            let call = self.router_rx.recv().await;
546
547            match call {
548                None => {
549                    return Option::None;
550                },
551                Some(call) => {
552                    match call {
553                        RouterCall::Frame { frame, session: lane } => {
554                            return Option::Some(StarCommand::Frame(frame));
555                        }
556                        _ => {
557                            // do nothing
558                        }
559                    }
560                }
561            }
562        }
563    }
564}
565
566