1use std::cell::Cell;
2use std::collections::{HashMap, HashSet};
3use std::convert::TryInto;
4use std::sync::atomic::{AtomicI32, AtomicI64, AtomicU64, Ordering};
5use std::sync::Arc;
6
7use futures::future::select_all;
8use futures::prelude::*;
9use futures::FutureExt;
10use tokio::sync::mpsc::{Receiver, Sender};
11use tokio::sync::{broadcast, mpsc};
12use tokio::time::{Duration, Instant};
13
14use crate::cache::ProtoArtifactCachesFactory;
15use crate::constellation::ConstellationStatus;
16use crate::error::Error;
17use crate::file_access::FileAccess;
18use crate::frame::{
19 Frame, ProtoFrame, SequenceMessage, StarMessage, StarMessagePayload, StarPattern, SearchWindDown,
20 SearchHit, SearchWindUp,
21};
22use crate::lane::{
23 ConnectorController, LaneCommand, LaneEnd, LaneIndex, LaneMeta, LaneWrapper,
24 ProtoLaneEnd, TunnelConnector, TunnelIn, TunnelOut, TunnelOutState,
25 STARLANE_PROTOCOL_VERSION,
26};
27use crate::logger::{Flag, Flags, Log, Logger, ProtoStarLog, ProtoStarLogPayload, StarFlag};
28use crate::permissions::AuthTokenSource;
29use crate::star::core::message::MessagingEndpointComponent;
30use crate::star::shell::lanes::{LaneMuxerApi, LaneMuxer, LanePattern};
31use crate::star::shell::search::{StarSearchApi, StarSearchComponent, StarSearchTransaction, ShortestPathStarKey};
32use crate::star::shell::message::{MessagingApi, MessagingComponent};
33use crate::star::shell::pledge::StarWranglerBacking;
34use crate::star::shell::router::{RouterApi, RouterComponent, RouterCall};
35use crate::star::surface::{SurfaceApi, SurfaceCall, SurfaceComponent};
36use crate::star::variant::{VariantApi, start_variant};
37use crate::star::{
38 ConstellationBroadcast, FrameHold, FrameTimeoutInner, Persistence, ResourceRegistryBacking,
39 ResourceRegistryBackingSqLite, Star, StarCommand, StarController,
40 StarInfo, StarKernel, StarKey, StarKind, StarSkel,
41};
42use crate::starlane::StarlaneMachine;
43use crate::template::StarKeyConstellationIndex;
44use crate::star::shell::locator::{ResourceLocatorApi, ResourceLocatorComponent};
45use crate::star::shell::golden::{GoldenPathApi, GoldenPathComponent};
46use crate::star::shell::watch::{WatchApi, WatchComponent};
47
48
49pub struct ProtoStar {
50 star_key: ProtoStarKey,
51 sequence: Arc<AtomicU64>,
52 kind: StarKind,
53 star_tx: mpsc::Sender<StarCommand>,
54 star_rx: mpsc::Receiver<StarCommand>,
55 surface_api: SurfaceApi,
56 surface_rx: mpsc::Receiver<SurfaceCall>,
57 lanes: HashMap<StarKey, LaneWrapper>,
58 proto_lanes: Vec<LaneWrapper>,
59 connector_ctrls: Vec<ConnectorController>,
60 logger: Logger,
62 frame_hold: FrameHold,
63 data_access: FileAccess,
64 proto_constellation_broadcast: Cell<Option<broadcast::Receiver<ConstellationBroadcast>>>,
65 constellation_status: ConstellationStatus,
66 flags: Flags,
67 tracker: ProtoTracker,
68 machine: StarlaneMachine,
69 lane_muxer_api: LaneMuxerApi,
70 router_tx: mpsc::Sender<RouterCall>,
71 router_booster_rx: RouterCallBooster
72}
73
74impl ProtoStar {
75 pub fn new(
76 key: ProtoStarKey,
77 kind: StarKind,
78 star_tx: Sender<StarCommand>,
79 star_rx: Receiver<StarCommand>,
80 surface_api: SurfaceApi,
81 surface_rx: mpsc::Receiver<SurfaceCall>,
82 data_access: FileAccess,
83 proto_constellation_broadcast: broadcast::Receiver<ConstellationBroadcast>,
84 flags: Flags,
85 logger: Logger,
86 machine: StarlaneMachine,
87 ) -> (Self, StarController) {
88 let (router_tx,router_rx) = mpsc::channel(1024);
89 let router_booster_rx = RouterCallBooster { router_rx };
90 let lane_muxer_api = LaneMuxer::start(router_tx.clone());
91 (
92
93 ProtoStar {
94 star_key: key,
95 sequence: Arc::new(AtomicU64::new(0)),
96 kind,
97 star_tx: star_tx.clone(),
98 star_rx,
99 lanes: HashMap::new(),
100 proto_lanes: vec![],
101 connector_ctrls: vec![],
102 logger: logger,
103 frame_hold: FrameHold::new(),
104 data_access: data_access,
105 proto_constellation_broadcast: Cell::new(Option::Some(
106 proto_constellation_broadcast,
107 )),
108 tracker: ProtoTracker::new(),
109 flags: flags,
110 constellation_status: ConstellationStatus::Unknown,
111 machine: machine,
112 surface_api: surface_api.clone(),
113 surface_rx,
114 lane_muxer_api,
115 router_tx,
116 router_booster_rx
117 },
118 StarController {
119 star_tx,
120 surface_api,
121 },
122 )
123 }
124
125 pub async fn evolve(mut self) -> Result<Star, Error> {
126 let mut proto_constellation_broadcast = self
127 .proto_constellation_broadcast
128 .replace(Option::None)
129 .ok_or("expected proto_constellation_broadcast to be Option::Some()")?;
130
131 let star_tx = self.star_tx.clone();
132 tokio::spawn(async move {
133 while let Result::Ok(broadcast) = proto_constellation_broadcast.recv().await {
134 star_tx
135 .send(StarCommand::ConstellationBroadcast(broadcast))
136 .await;
137 }
138 });
139
140 loop {
141
142 let mut futures = vec![];
143 futures.push(self.star_rx.recv().boxed() );
144 futures.push(self.router_booster_rx.boost().boxed());
145 let (call,_,_) = select_all(futures).await;
146
147 if let Some(call) = call{
148 match call {
149 StarCommand::GetStarInfo(tx) => match &self.star_key {
150 ProtoStarKey::Key(key) => {
151 tx.send(Option::Some(StarInfo {
152 key: key.clone(),
153 kind: self.kind.clone(),
154 }));
155 }
156 ProtoStarKey::RequestSubKeyExpansion(_) => {
157 tx.send(Option::None);
158 }
159 },
160 StarCommand::ConstellationBroadcast(broadcast) => match broadcast {
161 ConstellationBroadcast::Status(constellation_status) => {
162 self.constellation_status = constellation_status;
163 self.check_ready();
164 }
165 },
166 StarCommand::InvokeProtoStarEvolution => {
167 let star_key = match self.star_key
168 {
169 ProtoStarKey::Key(star_key) => star_key,
170 _ => panic!("proto star not ready for proto star evolution because it does not have a star_key yet assigned")
171 };
172
173 let info = StarInfo {
174 key: star_key,
175 kind: self.kind.clone(),
176 };
177
178 let (core_messaging_endpoint_tx, core_messaging_endpoint_rx) =
179 mpsc::channel(1024);
180 let (resource_locator_tx, resource_locator_rx) = mpsc::channel(1024);
181 let (star_locator_tx, star_locator_rx) = mpsc::channel(1024);
182 let (messaging_tx, messaging_rx) = mpsc::channel(1024);
183 let (golden_path_tx, golden_path_rx) = mpsc::channel(1024);
184 let (variant_tx, variant_rx) = mpsc::channel(1024);
185 let (watch_tx, watch_rx) = mpsc::channel(1024);
186
187 let resource_locator_api = ResourceLocatorApi::new(resource_locator_tx);
188 let star_search_api = StarSearchApi::new(star_locator_tx);
189 let router_api = RouterApi::new(self.router_tx);
190 let messaging_api = MessagingApi::new(messaging_tx);
191 let golden_path_api = GoldenPathApi::new(golden_path_tx);
192 let variant_api = VariantApi::new(variant_tx);
193 let watch_api = WatchApi::new(watch_tx);
194
195
196 let data_access = self
197 .data_access
198 .with_path(format!("stars/{}", info.key.to_string()))?;
199
200 let resource_registry: Option<Arc<dyn ResourceRegistryBacking>> =
201 if info.kind.is_resource_manager() {
202 Option::Some(Arc::new(
203 ResourceRegistryBackingSqLite::new(
204 info.clone(),
205 data_access.path(),
206 )
207 .await?,
208 ))
209 } else {
210 Option::None
211 };
212
213 let star_handler: Option<StarWranglerBacking> =
214 if !info.kind.conscripts().is_empty() {
215 Option::Some(StarWranglerBacking::new(self.star_tx.clone()).await)
216 } else {
217 Option::None
218 };
219
220 let skel = StarSkel {
221 info: info,
222 sequence: self.sequence.clone(),
223 star_tx: self.star_tx.clone(),
224 core_messaging_endpoint_tx: core_messaging_endpoint_tx.clone(),
225 logger: self.logger.clone(),
226 flags: self.flags.clone(),
227 registry: resource_registry,
228 star_handler: star_handler,
229 persistence: Persistence::Memory,
230 data_access: data_access,
231 machine: self.machine.clone(),
232 surface_api: self.surface_api,
233 resource_locator_api,
234 star_search_api,
235 router_api,
236 messaging_api,
237 lane_muxer_api: self.lane_muxer_api,
238 golden_path_api,
239 variant_api,
240 watch_api
241 };
242
243 start_variant(skel.clone(), variant_rx );
244
245 MessagingEndpointComponent::start(skel.clone(), core_messaging_endpoint_rx);
246 ResourceLocatorComponent::start(skel.clone(), resource_locator_rx);
247 StarSearchComponent::start(skel.clone(), star_locator_rx);
248 RouterComponent::start(skel.clone(), self.router_booster_rx.router_rx);
249 MessagingComponent::start(skel.clone(), messaging_rx);
250 SurfaceComponent::start(skel.clone(), self.surface_rx);
251 GoldenPathComponent::start(skel.clone(), golden_path_rx);
252 WatchComponent::start(skel.clone(), watch_rx);
253
254 return Ok(Star::from_proto(
255 skel,
256 self.star_rx,
257 core_messaging_endpoint_tx,
258 self.lanes,
259 self.proto_lanes,
260 self.connector_ctrls,
261 self.frame_hold,
262 )
263 .await);
264 }
265 StarCommand::AddProtoLaneEndpoint(lane) => {
266 match &self.star_key {
267 ProtoStarKey::Key(star_key) => {
268 lane.outgoing
269 .out_tx
270 .send(LaneCommand::Frame(Frame::Proto(
271 ProtoFrame::ReportStarKey(star_key.clone()),
272 )))
273 .await?;
274 }
275 ProtoStarKey::RequestSubKeyExpansion(_index) => {
276 lane.outgoing
277 .out_tx
278 .send(LaneCommand::Frame(Frame::Proto(
279 ProtoFrame::GatewaySelect,
280 )))
281 .await?;
282 }
283 }
284
285 self.lane_muxer_api.add_proto_lane(lane, StarPattern::Any );
286
287
288 }
289 StarCommand::Frame(Frame::Proto( ProtoFrame::GatewayAssign(subgraph))) => {
290 if let ProtoStarKey::RequestSubKeyExpansion(index) = self.star_key {
291 let star_key = StarKey::new_with_subgraph(subgraph.clone(), index);
292 self.star_key = ProtoStarKey::Key(star_key.clone());
293 self.lane_muxer_api.broadcast(Frame::Proto(
294 ProtoFrame::ReportStarKey(star_key.clone()),
295 ), LanePattern::Any );
296
297 self.lane_muxer_api.broadcast(Frame::Proto(
298 ProtoFrame::GatewayAssign(subgraph),
299 ), LanePattern::Protos );
300 self.check_ready();
301 } else {
302 eprintln!("not expecting a GatewayAssign for this ProtoStarKey which is already assigned.")
303 }
304 }
305 StarCommand::AddConnectorController(connector_ctrl) => {
306 self.connector_ctrls.push(connector_ctrl);
307 }
308 StarCommand::AddLogger(_logger) => {
309 }
311 _ => {
312 eprintln!("not implemented");
313 }
314 }
315 } else {
316 }
318 }
319 }
320
321 fn check_ready(&mut self) {
322 if self.constellation_status == ConstellationStatus::Assembled && self.star_key.is_some() {
323 let star_tx = self.star_tx.clone();
324 tokio::spawn(async move {
325 star_tx.send(StarCommand::InvokeProtoStarEvolution).await;
326 });
327 }
328 }
329
330
331}
332
333pub struct ProtoStarEvolution {
334 pub star: StarKey,
335 pub controller: StarController,
336}
337
338pub struct ProtoStarController {
339 command_tx: Sender<StarCommand>,
340}
341
342#[derive(Clone)]
343pub enum ProtoStarKernel {
344 Central,
345 Mesh,
346 Supervisor,
347 Server,
348 Gateway,
349}
350
351impl ProtoStarKernel {
352 fn evolve(&self) -> Result<Box<dyn StarKernel>, Error> {
353 Ok(Box::new(PlaceholderKernel::new()))
354 }
355}
356
357pub struct PlaceholderKernel {}
358
359impl PlaceholderKernel {
360 pub fn new() -> Self {
361 PlaceholderKernel {}
362 }
363}
364
365impl StarKernel for PlaceholderKernel {}
366
367pub struct ProtoTunnel {
368 pub tx: Sender<Frame>,
369 pub rx: Receiver<Frame>,
370}
371
372impl ProtoTunnel {
373 pub async fn evolve(mut self) -> Result<(TunnelOut, TunnelIn), Error> {
374 self.tx
375 .send(Frame::Proto(ProtoFrame::StarLaneProtocolVersion(
376 STARLANE_PROTOCOL_VERSION,
377 )))
378 .await;
379
380 if let Option::Some(Frame::Proto(recv)) = self.rx.recv().await {
382 match recv {
383 ProtoFrame::StarLaneProtocolVersion(version)
384 if version == STARLANE_PROTOCOL_VERSION =>
385 {
386 return Ok((
388 TunnelOut {
389 tx: self.tx,
391 },
392 TunnelIn {
393 rx: self.rx,
395 },
396 ));
397 }
398 ProtoFrame::StarLaneProtocolVersion(version) => {
399 return Err(format!("wrong version: {}", version).into());
400 }
401 gram => {
402 return Err(format!("unexpected star gram: {} (expected to receive StarLaneProtocolVersion first)", gram).into());
403 }
404 }
405 } else {
406 return Err("disconnected".into());
407 }
408
409 if let Option::Some(Frame::Proto(recv)) = self.rx.recv().await {
410 match recv {
411 ProtoFrame::ReportStarKey(_remote_star_key) => {
412 return Ok((
413 TunnelOut {
414 tx: self.tx,
416 },
417 TunnelIn {
418 rx: self.rx,
420 },
421 ));
422 }
423 frame => {
424 return Err(format!(
425 "unexpected star gram: {} (expected to receive ReportStarKey next)",
426 frame
427 )
428 .into());
429 }
430 };
431 } else {
432 return Err("disconnected!".into());
433 }
434 }
435}
436
437pub fn local_tunnels() -> (ProtoTunnel, ProtoTunnel) {
438 let (atx, arx) = mpsc::channel::<Frame>(32);
439 let (btx, brx) = mpsc::channel::<Frame>(32);
440
441 (
442 ProtoTunnel { tx: atx, rx: brx },
443 ProtoTunnel { tx: btx, rx: arx },
444 )
445}
446
447struct ProtoTrackerCase {
448 frame: Frame,
449 instant: Instant,
450 expect: fn(&Frame) -> bool,
451 retries: usize,
452}
453
454impl ProtoTrackerCase {
455 pub fn reset(&mut self) {
456 self.instant = Instant::now();
457 }
458}
459
460struct ProtoTracker {
461 case: Option<ProtoTrackerCase>,
462}
463
464impl ProtoTracker {
465 pub fn new() -> Self {
466 ProtoTracker { case: Option::None }
467 }
468
469 pub fn track(&mut self, frame: Frame, expect: fn(&Frame) -> bool) {
470 self.case = Option::Some(ProtoTrackerCase {
471 frame: frame,
472 instant: Instant::now(),
473 expect: expect,
474 retries: 0,
475 });
476 }
477
478 pub fn process(&mut self, frame: &Frame) {
479 if let Option::Some(case) = &self.case {
480 if (case.expect)(frame) {
481 self.case = Option::None;
482 }
483 }
484 }
485
486 pub fn has_expectation(&self) -> bool {
487 return self.case.is_some();
488 }
489
490 pub async fn check(&mut self) -> Option<StarCommand> {
491 if let Option::Some(case) = &mut self.case {
492 let now = Instant::now();
493 let seconds = 5 - (now.duration_since(case.instant).as_secs() as i64);
494 if seconds > 0 {
495 let duration = Duration::from_secs(seconds as u64);
496 tokio::time::sleep(duration).await;
497 }
498
499 case.retries = case.retries + 1;
500
501 case.reset();
502
503 Option::Some(StarCommand::FrameTimeout(FrameTimeoutInner {
504 frame: case.frame.clone(),
505 retries: case.retries,
506 }))
507 } else {
508 Option::None
509 }
510 }
511}
512
513pub enum LaneToCentralState {
514 Found(LaneToCentral),
515 None,
516}
517
518pub struct LaneToCentral {
519 remote_star: StarKey,
520 hops: usize,
521}
522
523#[derive(Clone)]
524pub enum ProtoStarKey {
525 Key(StarKey),
526 RequestSubKeyExpansion(StarKeyConstellationIndex),
527}
528
529impl ProtoStarKey {
530 pub fn is_some(&self) -> bool {
531 match self {
532 ProtoStarKey::Key(_) => true,
533 ProtoStarKey::RequestSubKeyExpansion(_) => false,
534 }
535 }
536}
537
538struct RouterCallBooster {
539 router_rx: mpsc::Receiver<RouterCall>
540}
541
542impl RouterCallBooster {
543 pub async fn boost(&mut self) -> Option<StarCommand> {
544 loop {
545 let call = self.router_rx.recv().await;
546
547 match call {
548 None => {
549 return Option::None;
550 },
551 Some(call) => {
552 match call {
553 RouterCall::Frame { frame, session: lane } => {
554 return Option::Some(StarCommand::Frame(frame));
555 }
556 _ => {
557 }
559 }
560 }
561 }
562 }
563 }
564}
565
566