starlane_core/
starlane.rs

1use std::cell::Cell;
2
3use std::collections::HashMap;
4
5use std::sync::{Arc, Mutex};
6
7use std::time::Duration;
8
9use futures::future::join_all;
10use futures::{FutureExt, StreamExt};
11
12use serde::{Deserialize, Serialize};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::sync::oneshot;
15use tokio::sync::{broadcast, mpsc};
16
17use crate::cache::ProtoArtifactCachesFactory;
18use crate::constellation::{Constellation, ConstellationStatus};
19use crate::error::Error;
20use crate::file_access::FileAccess;
21
22use crate::lane::{ClientSideTunnelConnector, LocalTunnelConnector, ProtoLaneEnd, ServerSideTunnelConnector, OnCloseAction};
23use crate::logger::{Flags, Logger};
24
25use crate::proto::{
26    local_tunnels, ProtoStar, ProtoStarController, ProtoStarEvolution, ProtoTunnel,
27};
28
29use starlane_resources::data::BinContext;
30use crate::star::surface::SurfaceApi;
31use crate::star::{ConstellationBroadcast, StarKind, StarStatus};
32use crate::star::{Request, Star, StarCommand, StarController, StarInfo, StarKey, StarTemplateId};
33use crate::starlane::api::StarlaneApi;
34use crate::starlane::files::MachineFileSystem;
35use crate::template::{
36    ConstellationData, ConstellationLayout, ConstellationSelector, ConstellationTemplate,
37    ConstellationTemplateHandle, MachineName, StarInConstellationTemplateHandle,
38    StarInConstellationTemplateSelector, StarKeyConstellationIndexTemplate,
39    StarKeySubgraphTemplate, StarKeyTemplate, StarSelector, StarTemplate, StarTemplateHandle,
40};
41use crate::util::AsyncHashMap;
42
43pub mod api;
44pub mod files;
45
46lazy_static! {
47//    pub static ref DATA_DIR: Mutex<String> = Mutex::new("data".to_string());
48    pub static ref DEFAULT_PORT: usize = std::env::var("STARLANE_PORT").unwrap_or("4343".to_string()).parse::<usize>().unwrap_or(4343);
49
50    pub static ref VERSION: VersionFrame = VersionFrame{ product: "Starlane".to_string(), version: "1.0.0".to_string() };
51}
52
53#[derive(Clone)]
54pub struct StarlaneMachine {
55    tx: mpsc::Sender<StarlaneCommand>,
56    run_complete_signal_tx: broadcast::Sender<()>,
57    machine_filesystem: Arc<MachineFileSystem>,
58}
59
60impl StarlaneMachine {
61    pub fn new(name: MachineName) -> Result<Self, Error> {
62        Self::new_with_artifact_caches(name, Option::None)
63    }
64
65    pub fn new_with_artifact_caches(
66        name: MachineName,
67        artifact_caches: Option<Arc<ProtoArtifactCachesFactory>>,
68    ) -> Result<Self, Error> {
69        let runner = StarlaneMachineRunner::new_with_artifact_caches(name, artifact_caches)?;
70        let tx = runner.command_tx.clone();
71        let run_complete_signal_tx = runner.run();
72        let starlane = Self {
73            tx: tx,
74            run_complete_signal_tx: run_complete_signal_tx,
75            machine_filesystem: Arc::new(MachineFileSystem::new()?),
76        };
77
78        Result::Ok(starlane)
79    }
80
81    pub async fn get_proto_artifact_caches_factory(
82        &self,
83    ) -> Result<Arc<ProtoArtifactCachesFactory>,Error> {
84        let (tx, rx) = oneshot::channel();
85        self.tx
86            .send(StarlaneCommand::GetProtoArtifactCachesFactory(tx))
87            .await?;
88        Ok(rx.await?.ok_or("expected proto artifact cache")?)
89
90    }
91
92    pub fn bin_context(&self) -> Arc<dyn BinContext> {
93        self.machine_filesystem.clone()
94    }
95
96    pub fn machine_filesystem(&self) -> Arc<MachineFileSystem> {
97        self.machine_filesystem.clone()
98    }
99
100    pub fn shutdown(&self) {
101        let tx = self.tx.clone();
102        tokio::spawn(async move {
103            tx.send(StarlaneCommand::Shutdown).await;
104        });
105    }
106
107    pub async fn create_constellation(
108        &self,
109        name: &str,
110        layout: ConstellationLayout,
111    ) -> Result<(), Error> {
112        let name = name.to_string();
113        let (tx, rx) = oneshot::channel();
114        let create = ConstellationCreate {
115            name,
116            layout,
117            tx,
118            machine: self.clone(),
119        };
120
121        self.tx
122            .send(StarlaneCommand::ConstellationCreate(create))
123            .await?;
124        rx.await?
125    }
126
127    pub async fn get_starlane_api(&self) -> Result<StarlaneApi, Error> {
128        let (tx, rx) = oneshot::channel();
129        self.tx
130            .send(StarlaneCommand::StarlaneApiSelectBest(tx))
131            .await?;
132        rx.await?
133    }
134
135    pub async fn listen(&self) -> Result<(), Error> {
136        let command_tx = self.tx.clone();
137        let (tx, rx) = oneshot::channel();
138        command_tx.send(StarlaneCommand::Listen(tx)).await;
139        rx.await?
140    }
141
142    pub async fn join(self) {
143        let mut run_complete_signal_rx = self.run_complete_signal_tx.subscribe();
144        join!(run_complete_signal_rx.recv());
145    }
146}
147
148pub struct StarlaneMachineRunner {
149    name: MachineName,
150    pub command_tx: mpsc::Sender<StarlaneCommand>,
151    command_rx: mpsc::Receiver<StarlaneCommand>,
152    star_controllers: AsyncHashMap<StarInConstellationTemplateHandle, StarController>,
153    //    star_core_ext_factory: Arc<dyn StarCoreExtFactory>,
154    data_access: FileAccess,
155    cache_access: FileAccess,
156    pub logger: Logger,
157    pub flags: Flags,
158    pub artifact_caches: Option<Arc<ProtoArtifactCachesFactory>>,
159    constellations: HashMap<String, Constellation>,
160    port: usize,
161    inner_flags: Arc<Mutex<Cell<StarlaneInnerFlags>>>,
162}
163
164impl StarlaneMachineRunner {
165    pub fn new(machine: String) -> Result<Self, Error> {
166        Self::new_with_artifact_caches(machine, Option::None)
167    }
168
169    pub fn new_with_artifact_caches(
170        machine: String,
171        artifact_caches: Option<Arc<ProtoArtifactCachesFactory>>,
172    ) -> Result<Self, Error> {
173        let (command_tx, command_rx) = mpsc::channel(32);
174        Ok(StarlaneMachineRunner {
175            name: machine,
176            star_controllers: AsyncHashMap::new(),
177            command_tx,
178            command_rx,
179            //            star_core_ext_factory: Arc::new(ExampleStarCoreExtFactory::new() ),
180            logger: Logger::new(),
181            flags: Flags::new(),
182            data_access: FileAccess::new(
183                std::env::var("STARLANE_DATA").unwrap_or("data".to_string()),
184            )?,
185            cache_access: FileAccess::new(
186                std::env::var("STARLANE_CACHE").unwrap_or("cache".to_string()),
187            )?,
188            artifact_caches: artifact_caches,
189            port: DEFAULT_PORT.clone(),
190            constellations: HashMap::new(),
191            inner_flags: Arc::new(Mutex::new(Cell::new(StarlaneInnerFlags::new()))),
192        })
193    }
194
195    pub fn run(mut self) -> broadcast::Sender<()> {
196        let command_tx = self.command_tx.clone();
197        tokio::spawn(async move {
198            let mut shutdown_rx = crate::util::SHUTDOWN_TX.subscribe();
199            shutdown_rx.recv().await;
200            command_tx.try_send(StarlaneCommand::Shutdown);
201        });
202
203        let (run_complete_signal_tx, _) = broadcast::channel(1);
204        let run_complete_signal_tx_rtn = run_complete_signal_tx.clone();
205
206        tokio::spawn(async move {
207            while let Option::Some(command) = self.command_rx.recv().await {
208                match command {
209                    StarlaneCommand::ConstellationCreate(command) => {
210                        let result = self
211                            .constellation_create(command.layout, command.name, command.machine)
212                            .await;
213
214                        //sleep(Duration::from_secs(10)).await;
215                        if let Err(error) = &result {
216                            error!("CONSTELLATION CREATE ERROR: {}", error.to_string());
217                        }
218                        command.tx.send(result);
219                    }
220                    StarlaneCommand::StarlaneApiSelectBest(tx) => {
221                        let map = match self.star_controllers.clone().into_map().await {
222                            Ok(map) => map,
223                            Err(err) => {
224                                tx.send(Err(err));
225                                continue;
226                            }
227                        };
228                        if map.is_empty() {
229                            tx.send(Err(
230                                "ERROR: cannot create StarlaneApi: no StarControllers available."
231                                    .into(),
232                            ));
233                            continue;
234                        }
235                        let values: Vec<StarController> =
236                            map.into_iter().map(|(_k, v)| v).collect();
237
238                        let mut best = Option::None;
239
240                        for star_ctrl in values {
241                            let info = star_ctrl.get_star_info().await.unwrap().unwrap();
242                            if best.is_none() {
243                                best = Option::Some((info, star_ctrl));
244                            } else {
245                                let (prev_info, _) = best.as_ref().unwrap();
246                                match info.kind {
247                                    StarKind::Mesh => {
248                                        best = Option::Some((info, star_ctrl));
249                                    }
250                                    StarKind::Client => {
251                                        if prev_info.kind != StarKind::Mesh {
252                                            best = Option::Some((info, star_ctrl));
253                                        }
254                                    }
255                                    _ => {}
256                                }
257                            }
258                        }
259
260                        let (_info, star_ctrl) = best.unwrap();
261
262                        tx.send(Ok(StarlaneApi::with_starlane_ctrl(
263                            star_ctrl.surface_api,
264                            self.command_tx.clone(),
265                        )));
266                    }
267                    StarlaneCommand::Shutdown => {
268                        let listening = {
269                            let mut inner_flags = self.inner_flags.lock().unwrap();
270                            let mut_flags = inner_flags.get_mut();
271                            mut_flags.shutdown = true;
272                            mut_flags.listening
273                        };
274
275                        if listening {
276                            Self::unlisten(self.inner_flags.clone(), self.port.clone());
277                        }
278
279                        for (_, star_ctrl) in self
280                            .star_controllers
281                            .clone()
282                            .into_map()
283                            .await
284                            .unwrap_or(Default::default())
285                        {
286                            star_ctrl.star_tx.try_send(StarCommand::Shutdown);
287                        }
288
289                        self.star_controllers.clear();
290                        self.command_rx.close();
291                        break;
292                    }
293                    StarlaneCommand::Listen(tx) => {
294                        self.listen(tx);
295                    }
296                    StarlaneCommand::AddStream(stream) => {
297                        match self.select_star_kind(&StarKind::Gateway).await {
298                            Ok(Option::Some(star_ctrl)) => {
299                                match self.add_server_side_lane_ctrl(star_ctrl, stream,OnCloseAction::Remove).await {
300                                    Ok(_result) => {}
301                                    Err(error) => {
302                                        error!("{}", error);
303                                    }
304                                }
305                            }
306                            Ok(Option::None) => {
307                                error!("cannot find StarController for kind: StarKind::Gateway");
308                            }
309                            Err(err) => {
310                                error!("{}", err);
311                            }
312                        }
313                    }
314                    StarlaneCommand::GetProtoArtifactCachesFactory(tx) => {
315                        match self.artifact_caches.as_ref() {
316                            None => {
317                                tx.send(Option::None);
318                            }
319                            Some(caches) => {
320                                tx.send(Option::Some(caches.clone()));
321                            }
322                        }
323                    }
324                }
325            }
326            run_complete_signal_tx.send(());
327        });
328        run_complete_signal_tx_rtn
329    }
330
331    async fn select_star_kind(&self, kind: &StarKind) -> Result<Option<StarController>, Error> {
332        let map = self.star_controllers.clone().into_map().await?;
333        let values: Vec<StarController> = map.into_iter().map(|(_k, v)| v).collect();
334
335        for star_ctrl in values {
336            let info = star_ctrl
337                .get_star_info()
338                .await?
339                .ok_or("expected StarInfo")?;
340            if info.kind == *kind {
341                return Ok(Option::Some(star_ctrl));
342            }
343        }
344
345        Ok(Option::None)
346    }
347
348    async fn constellation_create(
349        &mut self,
350        layout: ConstellationLayout,
351        name: String,
352        starlane_machine: StarlaneMachine,
353    ) -> Result<(), Error> {
354        if self.constellations.contains_key(&name) {
355            return Err(format!(
356                "constellation named '{}' already exists in this StarlaneMachine.",
357                name
358            )
359            .into());
360        }
361
362        let mut constellation = Constellation::new(name.clone());
363        let mut evolve_rxs = vec![];
364        let (constellation_broadcaster, _) = broadcast::channel(16);
365
366        for star_template in layout.template.stars.clone() {
367            constellation.stars.push(star_template.clone());
368
369            let star_template_id =
370                StarInConstellationTemplateHandle::new(name.clone(), star_template.handle.clone());
371
372            let machine = layout
373                .handles_to_machine
374                .get(&star_template.handle)
375                .ok_or(format!(
376                    "expected machine mapping for star template handle: {}",
377                    star_template.handle.to_string()
378                ))?;
379            if self.name == *machine {
380                let star_key = star_template.key.create();
381
382                let (evolve_tx, evolve_rx) = oneshot::channel();
383                evolve_rxs.push(evolve_rx);
384
385                let (star_tx, star_rx) = mpsc::channel(1024);
386                let (surface_tx, surface_rx) = mpsc::channel(1024);
387                let surface_api = SurfaceApi::new(surface_tx);
388
389                let star_ctrl = StarController {
390                    star_tx: star_tx.clone(),
391                    surface_api: surface_api.clone(),
392                };
393                self.star_controllers.put(star_template_id, star_ctrl).await;
394
395                if self.artifact_caches.is_none() {
396                    let api = StarlaneApi::new(surface_api.clone());
397                    let caches = Arc::new(ProtoArtifactCachesFactory::new(
398                        api.into(),
399                        self.cache_access.clone(),
400                        starlane_machine.clone()
401                    )?);
402                    self.artifact_caches = Option::Some(caches);
403                }
404
405                let (proto_star, _star_ctrl) = ProtoStar::new(
406                    star_key.clone(),
407                    star_template.kind.clone(),
408                    star_tx.clone(),
409                    star_rx,
410                    surface_api,
411                    surface_rx,
412                    self.data_access.clone(),
413                    constellation_broadcaster.subscribe(),
414                    self.flags.clone(),
415                    self.logger.clone(),
416                    starlane_machine.clone(),
417                );
418
419                tokio::spawn(async move {
420                    let star = proto_star.evolve().await;
421                    if let Ok(star) = star {
422                        let key = star.star_key().clone();
423
424                        let star_tx = star.star_tx();
425                        let surface_api = star.surface_api();
426                        tokio::spawn(async move {
427                            star.run().await;
428                        });
429                        evolve_tx.send(ProtoStarEvolution {
430                            star: key.clone(),
431                            controller: StarController {
432                                star_tx,
433                                surface_api,
434                            },
435                        });
436                        /*
437                        println!(
438                            "created star: {:?} key: {}",
439                            &star_template.kind,
440                            &key.to_string()
441                        );
442
443                         */
444                    } else {
445                        eprintln!("experienced serious error could not evolve the proto_star");
446                    }
447                });
448            } else {
449                println!(
450                    "skipping star not hosted on this machine: {}",
451                    star_template.handle.to_string()
452                )
453            }
454        }
455
456        // now connect the LANES
457        let mut proto_lane_evolution_rxs = vec![];
458        for star_template in &layout.template.stars {
459            let machine = layout
460                .handles_to_machine
461                .get(&star_template.handle)
462                .ok_or(format!(
463                    "expected machine mapping for star template handle: {}",
464                    star_template.handle.to_string()
465                ))?;
466            let local_star =
467                StarInConstellationTemplateHandle::new(name.clone(), star_template.handle.clone());
468            if self.name == *machine {
469                for lane in &star_template.lanes {
470                    match &lane.star_selector.constellation {
471                        ConstellationSelector::Local => {
472                            let second_star = constellation
473                                .select(lane.star_selector.star.clone())
474                                .ok_or("cannot select star from local constellation")?
475                                .clone();
476                            let second_star = StarInConstellationTemplateHandle::new(
477                                name.clone(),
478                                second_star.handle,
479                            );
480                            let mut evolution_rxs =
481                                self.add_local_lane(local_star.clone(), second_star).await?;
482                            proto_lane_evolution_rxs.append(&mut evolution_rxs);
483                        }
484                        ConstellationSelector::Named(constellation_name) => {
485                            let constellation = self
486                                .constellations
487                                .get(constellation_name)
488                                .ok_or(format!(
489                                "cannot select constellation named '{}' on this StarlaneMachine",
490                                constellation_name
491                            ))?;
492                            let second_star = constellation
493                                .select(lane.star_selector.star.clone())
494                                .ok_or(format!(
495                                    "cannot select star from constellation {}",
496                                    constellation_name
497                                ))?
498                                .clone();
499                            let second_star = StarInConstellationTemplateHandle::new(
500                                constellation.name.clone(),
501                                second_star.handle,
502                            );
503                            let mut evolution_rxs =
504                                self.add_local_lane(local_star.clone(), second_star).await?;
505                            proto_lane_evolution_rxs.append(&mut evolution_rxs);
506                        }
507                        ConstellationSelector::AnyWithGatewayInsideMachine(machine_name) => {
508                            let host_address =
509                                layout.get_machine_host_adddress(machine_name.clone());
510                            let star_ctrl = self
511                                .star_controllers
512                                .get(local_star.clone())
513                                .await?
514                                .ok_or("expected local star to have star_ctrl")?;
515                            let proto_lane_evolution_rx = self
516                                .add_client_side_lane_ctrl(
517                                    star_ctrl,
518                                    host_address,
519                                    lane.star_selector.clone(),
520                                    true,
521                                    OnCloseAction::Remove
522                                )
523                                .await?;
524                            proto_lane_evolution_rxs.push(proto_lane_evolution_rx);
525                        }
526                    }
527                }
528            }
529        }
530
531        let proto_lane_evolutions =
532            join_all(proto_lane_evolution_rxs.iter_mut().map(|x| x.recv())).await;
533
534        for result in proto_lane_evolutions {
535            result??;
536        }
537
538        // announce that the local constellation is now complete
539        constellation_broadcaster.send(ConstellationBroadcast::Status(
540            ConstellationStatus::Assembled,
541        ));
542
543        let evolutions = join_all(evolve_rxs).await;
544
545        for evolve in evolutions {
546            if let Ok(evolve) = evolve {
547                evolve.controller.surface_api.init();
548            } else if let Err(error) = evolve {
549                return Err(error.to_string().into());
550            }
551        }
552
553        let mut ready_futures = vec![];
554        for star_template in &layout.template.stars {
555            let machine = layout
556                .handles_to_machine
557                .get(&star_template.handle)
558                .ok_or(format!(
559                    "expected machine mapping for star template handle: {}",
560                    star_template.handle.to_string()
561                ))?;
562            if self.name == *machine {
563                let local_star = StarInConstellationTemplateHandle::new(
564                    name.clone(),
565                    star_template.handle.clone(),
566                );
567                let star_ctrl =
568                    self.star_controllers
569                        .get(local_star.clone())
570                        .await?
571                        .ok_or(format!(
572                            "expected star controller: {}",
573                            local_star.to_string()
574                        ))?;
575                let (tx, rx) = oneshot::channel();
576                star_ctrl
577                    .star_tx
578                    .send(StarCommand::GetStatusListener(tx))
579                    .await;
580                let mut star_status_receiver = rx.await?;
581                let (ready_status_tx, ready_status_rx) = oneshot::channel();
582                tokio::spawn(async move {
583                    while let Result::Ok(status) = star_status_receiver.recv().await {
584                        if status == StarStatus::Ready {
585                            ready_status_tx.send(());
586                            break;
587                        }
588                    }
589                });
590                ready_futures.push(ready_status_rx);
591            }
592        }
593
594        // wait for all stars to be StarStatus::Ready
595        join_all(ready_futures).await;
596
597        Ok(())
598    }
599
600    fn listen(&mut self, result_tx: oneshot::Sender<Result<(), Error>>) {
601        {
602            let mut inner_flags = self.inner_flags.lock().unwrap();
603            let flags = inner_flags.get_mut();
604
605            if flags.listening {
606                result_tx.send(Ok(()));
607                return;
608            }
609            flags.listening = true;
610        }
611
612        {
613            let _port = self.port.clone();
614            let _inner_flags = self.inner_flags.clone();
615
616            /*            ctrlc::set_handler( move || {
617                           Self::unlisten(inner_flags.clone(), port.clone());
618                       }).expect("expected to be able to set ctrl-c handler");
619            */
620        }
621
622        let port = self.port.clone();
623        let command_tx = self.command_tx.clone();
624        let flags = self.inner_flags.clone();
625        tokio::spawn(async move {
626            match std::net::TcpListener::bind(format!("127.0.0.1:{}", port)) {
627                Ok(std_listener) => {
628                    let listener = TcpListener::from_std(std_listener).unwrap();
629                    result_tx.send(Ok(()));
630                    while let Ok((stream, _)) = listener.accept().await {
631                        {
632                            let mut flags = flags.lock().unwrap();
633                            let flags = flags.get_mut();
634                            if flags.shutdown {
635                                drop(listener);
636                                return;
637                            }
638                        }
639                        let _ok = command_tx
640                            .send(StarlaneCommand::AddStream(stream))
641                            .await
642                            .is_ok();
643                        tokio::time::sleep(Duration::from_secs(0)).await;
644                    }
645                }
646                Err(error) => {
647                    error!("FATAL: could not setup TcpListener {}", error);
648                    result_tx.send(Err(error.into()));
649                }
650            }
651        });
652    }
653
654    pub fn caches(&self) -> Result<Arc<ProtoArtifactCachesFactory>, Error> {
655        Ok(self
656            .artifact_caches
657            .as_ref()
658            .ok_or("expected caches to be set")?
659            .clone())
660    }
661
662    async fn add_local_lane(
663        &mut self,
664        local: StarInConstellationTemplateHandle,
665        second: StarInConstellationTemplateHandle,
666    ) -> Result<Vec<broadcast::Receiver<Result<(), Error>>>, Error> {
667        let (high, low) = crate::util::sort(local, second)?;
668
669        let high_star_ctrl = {
670            let high_star_ctrl = self.star_controllers.get(high.clone()).await?;
671            match high_star_ctrl {
672                None => {
673                    return Err(format!(
674                        "lane cannot construct. missing local star key: {}",
675                        high.star.to_string()
676                    )
677                    .into());
678                }
679                Some(high_star_ctrl) => high_star_ctrl.clone(),
680            }
681        };
682
683        let low_star_ctrl = {
684            let low_star_ctrl = self.star_controllers.get(low.clone()).await?;
685            match low_star_ctrl {
686                None => {
687                    return Err(format!(
688                        "lane cannot construct. missing second star key: {}",
689                        low.star.to_string()
690                    )
691                    .into());
692                }
693                Some(low_star_ctrl) => low_star_ctrl.clone(),
694            }
695        };
696        self.add_local_lane_ctrl(high_star_ctrl, low_star_ctrl)
697            .await
698    }
699
700    async fn add_local_lane_ctrl(
701        &mut self,
702        high_star_ctrl: StarController,
703        low_star_ctrl: StarController,
704    ) -> Result<Vec<broadcast::Receiver<Result<(), Error>>>, Error> {
705        let high_lane = ProtoLaneEnd::new(Option::None, OnCloseAction::Remove );
706        let low_lane = ProtoLaneEnd::new(Option::None, OnCloseAction::Remove );
707        let rtn = vec![high_lane.get_evoltion_rx(), low_lane.get_evoltion_rx()];
708        let connector = LocalTunnelConnector::new(&high_lane, &low_lane).await?;
709        high_star_ctrl
710            .star_tx
711            .send(StarCommand::AddProtoLaneEndpoint(high_lane))
712            .await?;
713        low_star_ctrl
714            .star_tx
715            .send(StarCommand::AddProtoLaneEndpoint(low_lane))
716            .await?;
717        high_star_ctrl
718            .star_tx
719            .send(StarCommand::AddConnectorController(connector))
720            .await?;
721
722        Ok(rtn)
723    }
724
725    async fn add_server_side_lane_ctrl(
726        &mut self,
727        low_star_ctrl: StarController,
728        stream: TcpStream,
729        on_close_action: OnCloseAction
730    ) -> Result<broadcast::Receiver<Result<(), Error>>, Error> {
731        let low_lane = ProtoLaneEnd::new(Option::None, on_close_action  );
732        let rtn = low_lane.get_evoltion_rx();
733
734        let connector_ctrl = ServerSideTunnelConnector::new(&low_lane, stream).await?;
735
736        low_star_ctrl
737            .star_tx
738            .send(StarCommand::AddProtoLaneEndpoint(low_lane))
739            .await?;
740
741        low_star_ctrl
742            .star_tx
743            .send(StarCommand::AddConnectorController(connector_ctrl))
744            .await?;
745
746        Ok(rtn)
747    }
748
749    async fn add_client_side_lane_ctrl(
750        &mut self,
751        star_ctrl: StarController,
752        host_address: String,
753        selector: StarInConstellationTemplateSelector,
754        key_requestor: bool,
755        on_close_action: OnCloseAction
756
757    ) -> Result<broadcast::Receiver<Result<(), Error>>, Error> {
758        let mut lane = ProtoLaneEnd::new(Option::None, on_close_action);
759        lane.key_requestor = key_requestor;
760
761        let rtn = lane.get_evoltion_rx();
762
763        let connector = ClientSideTunnelConnector::new(&lane, host_address, selector).await?;
764
765        star_ctrl
766            .star_tx
767            .send(StarCommand::AddProtoLaneEndpoint(lane))
768            .await?;
769
770        star_ctrl
771            .star_tx
772            .send(StarCommand::AddConnectorController(connector))
773            .await?;
774
775        Ok(rtn)
776    }
777
778    fn unlisten(inner_flags: Arc<Mutex<Cell<StarlaneInnerFlags>>>, port: usize) {
779        {
780            let mut flags = inner_flags.lock().unwrap();
781            flags.get_mut().shutdown = true;
782        }
783        std::net::TcpStream::connect(format!("localhost:{}", port));
784    }
785}
786
787impl Drop for StarlaneMachineRunner {
788    fn drop(&mut self) {
789        {
790            let mut flags = self.inner_flags.lock().unwrap();
791
792            let flags_mut = flags.get_mut();
793
794            if !flags_mut.shutdown {
795                warn!("dropping Starlane( {} ) unexpectedly", self.name);
796            }
797
798            if !flags_mut.listening {
799                Self::unlisten(self.inner_flags.clone(), self.port.clone());
800            }
801        }
802    }
803}
804
805#[derive(Clone, Serialize, Deserialize)]
806pub struct VersionFrame {
807    product: String,
808    version: String,
809}
810
811#[derive(strum_macros::Display)]
812pub enum StarlaneCommand {
813    ConstellationCreate(ConstellationCreate),
814    StarlaneApiSelectBest(oneshot::Sender<Result<StarlaneApi, Error>>),
815    Listen(oneshot::Sender<Result<(), Error>>),
816    AddStream(TcpStream),
817    GetProtoArtifactCachesFactory(oneshot::Sender<Option<Arc<ProtoArtifactCachesFactory>>>),
818    Shutdown,
819}
820
821pub struct StarlaneApiRequestByKey {
822    pub star: StarKey,
823    pub tx: oneshot::Sender<StarlaneApi>,
824}
825
826pub struct StarlaneApiRequest {
827    pub selector: StarSelector,
828    pub tx: oneshot::Sender<StarlaneApi>,
829}
830
831impl StarlaneApiRequest {
832    pub fn new(selector: StarSelector) -> (Self, oneshot::Receiver<StarlaneApi>) {
833        let (tx, rx) = oneshot::channel();
834        (
835            Self {
836                selector: selector,
837                tx: tx,
838            },
839            rx,
840        )
841    }
842}
843
844pub struct ConstellationCreate {
845    name: String,
846    layout: ConstellationLayout,
847    tx: oneshot::Sender<Result<(), Error>>,
848    machine: StarlaneMachine,
849}
850
851impl ConstellationCreate {
852    pub fn new(
853        layout: ConstellationLayout,
854        name: String,
855        machine: StarlaneMachine,
856    ) -> (Self, oneshot::Receiver<Result<(), Error>>) {
857        let (tx, rx) = oneshot::channel();
858        (
859            ConstellationCreate {
860                name: name,
861                layout: layout,
862                tx: tx,
863                machine: machine,
864            },
865            rx,
866        )
867    }
868}
869
870pub enum StarAddress {
871    Local,
872}
873
874#[derive(Clone)]
875struct StarlaneInnerFlags {
876    pub shutdown: bool,
877    pub listening: bool,
878}
879
880impl StarlaneInnerFlags {
881    pub fn new() -> Self {
882        Self {
883            shutdown: false,
884            listening: false,
885        }
886    }
887}
888
889#[cfg(test)]
890mod test {
891    use std::convert::TryInto;
892    use std::fs;
893    use std::fs::File;
894    use std::io::Read;
895    use std::str::FromStr;
896    use std::sync::Arc;
897
898    use starlane_resources::ArtifactBundlePath;
899    use tokio::runtime::Runtime;
900    use tokio::sync::oneshot;
901    use tokio::sync::oneshot::error::RecvError;
902    use tokio::time::timeout;
903    use tokio::time::Duration;
904    use tracing::dispatcher::set_global_default;
905    use tracing_subscriber::FmtSubscriber;
906
907    use crate::artifact::ArtifactLocation;
908    use crate::error::Error;
909    use crate::logger::{
910        Flag, Flags, Log, LogAggregate, ProtoStarLog, ProtoStarLogPayload, StarFlag, StarLog,
911        StarLogPayload,
912    };
913    use starlane_resources::message::Fail;
914    use crate::names::Name;
915    use crate::permissions::Authentication;
916    use crate::resource::ArtifactBundleAddress;
917    use crate::resource::{ResourceAddress};
918    use crate::space::CreateAppControllerFail;
919    use crate::star::{StarController, StarInfo, StarKey, StarKind};
920    use crate::starlane::api::SubSpaceApi;
921    use crate::starlane::{
922        ConstellationCreate, StarlaneApiRequest, StarlaneCommand, StarlaneMachine,
923        StarlaneMachineRunner,
924    };
925    use crate::template::{ConstellationLayout, ConstellationTemplate};
926
927    #[test]
928    #[instrument]
929    pub fn tracing() {
930        let subscriber = FmtSubscriber::default();
931        set_global_default(subscriber.into()).expect("setting global default failed");
932        info!("tracing works!");
933    }
934
935
936    #[test]
937    pub fn mechtron() {
938println!("Mechtron..");
939        let subscriber = FmtSubscriber::default();
940        set_global_default(subscriber.into()).expect("setting global default failed");
941
942        let data_dir = "tmp/data";
943        let cache_dir = "tmp/cache";
944        fs::remove_dir_all(data_dir).unwrap_or_default();
945        fs::remove_dir_all(cache_dir).unwrap_or_default();
946        std::env::set_var("STARLANE_DATA", data_dir);
947        std::env::set_var("STARLANE_CACHE", cache_dir);
948
949        println!("Hello");
950        let rt = Runtime::new().unwrap();
951        rt.block_on(async {
952println!("block ON..");
953            async fn test() -> Result<(),Error> {
954                let mut starlane = StarlaneMachine::new("server".to_string()).unwrap();
955
956                starlane.listen().await.unwrap();
957
958                starlane.create_constellation("standalone", ConstellationLayout::standalone().unwrap()) .await?;
959
960println!("POST CREATE CONSTELLATION");
961
962                let starlane_api = starlane.get_starlane_api().await.unwrap();
963
964                let sub_space_api = starlane_api.get_sub_space( ResourceAddress::from_str("hyperspace:starlane<SubSpace>").unwrap() .into(), ) .await?;
965
966                {
967                    let mut file =
968                        File::open("../../wasm/appy/appy.zip").unwrap();
969                    let mut data = vec![];
970                    file.read_to_end(&mut data).unwrap();
971                    let address =  ResourceAddress::from_str("hyperspace:starlane:appy:1.0.0<ArtifactBundle>")?;
972                    let mut creation = sub_space_api
973                        .create_artifact_bundle_versions(address.parent().unwrap().name().as_str())?;
974                    let artifact_bundle_versions_api = creation.submit().await?;
975
976                    let version = semver::Version::from_str(address.name().as_str())?;
977                    let mut creation = artifact_bundle_versions_api.create_artifact_bundle(
978                        version,
979                        Arc::new(data),
980                    )?;
981                    creation.submit().await?;
982                }
983println!("appy bundle published");
984
985
986
987                let config = ResourceAddress::from_str("hyperspace:starlane:appy:1.0.0:/app/appy-config.yaml<Artifact>")?;
988                let app_api = sub_space_api.create_app("appy", config.try_into()? )?.submit().await?;
989
990println!("app created: {}", app_api.key().to_string() );
991
992                std::thread::sleep(std::time::Duration::from_secs(10));
993
994                starlane.shutdown();
995
996                std::thread::sleep(std::time::Duration::from_secs(1));
997
998                Ok(())
999            }
1000            match test().await {
1001                Ok(_) => {}
1002                Err(error) => {
1003                    eprintln!("{}",error.to_string());
1004                    assert!(false);
1005                }
1006            }
1007        });
1008    }
1009
1010    /*
1011    #[test]
1012    pub fn starlane() {
1013        let subscriber = FmtSubscriber::default();
1014        set_global_default(subscriber.into()).expect("setting global default failed");
1015
1016        let data_dir = "tmp/data";
1017        let cache_dir = "tmp/cache";
1018        fs::remove_dir_all(data_dir).unwrap_or_default();
1019        fs::remove_dir_all(cache_dir).unwrap_or_default();
1020        std::env::set_var("STARLANE_DATA", data_dir);
1021        std::env::set_var("STARLANE_CACHE", cache_dir);
1022
1023        println!("Hello");
1024        let rt = Runtime::new().unwrap();
1025        rt.block_on(async {
1026            let mut starlane = StarlaneMachine::new("server".to_string()).unwrap();
1027            starlane.listen().await.unwrap();
1028
1029            tokio::spawn(async {
1030                println!("PRE CREATE CONSTELLATION");
1031            });
1032
1033            starlane
1034                .create_constellation("standalone", ConstellationLayout::standalone().unwrap())
1035                .await
1036                .unwrap();
1037
1038            tokio::spawn(async {
1039                println!("POST CREATE CONSTELLATION");
1040            });
1041
1042            let mut client = StarlaneMachine::new_with_artifact_caches(
1043                "client".to_string(),
1044                starlane.get_proto_artifact_caches_factory().await.unwrap(),
1045            )
1046            .unwrap();
1047            let mut client_layout = ConstellationLayout::client("gateway".to_string()).unwrap();
1048            client_layout.set_machine_host_address(
1049                "gateway".to_lowercase(),
1050                format!("localhost:{}", crate::starlane::DEFAULT_PORT.clone()),
1051            );
1052            client
1053                .create_constellation("client", client_layout)
1054                .await
1055                .unwrap();
1056
1057            tokio::time::sleep(Duration::from_secs(1)).await;
1058            tokio::spawn(async {
1059                println!("GOT TO FIRST SLEEP");
1060            });
1061
1062            let starlane_api = client.get_starlane_api().await.unwrap();
1063
1064            if starlane_api.ping_gateway().await.is_err() {
1065                error!("failed to ping gateway");
1066                client.shutdown();
1067                starlane.shutdown();
1068                return;
1069            }
1070            tokio::spawn(async {
1071                println!("PING GATEWAY");
1072            });
1073
1074            let sub_space_api = match starlane_api
1075                .get_sub_space(
1076                    ResourceAddress::from_str("hyperspace:default::<SubSpace>")
1077                        .unwrap()
1078                        .into(),
1079                )
1080                .await
1081            {
1082                Ok(api) => api,
1083                Err(err) => {
1084                    eprintln!("{}", err.to_string());
1085                    panic!(err)
1086                }
1087            };
1088
1089            let file_api = sub_space_api
1090                .create_file_system("website")
1091                .unwrap()
1092                .submit()
1093                .await
1094                .unwrap();
1095            file_api
1096                .create_file_from_string(
1097                    &"/index.html".try_into().unwrap(),
1098                    "The rain in Spain falls mostly on the plain.".to_string(),
1099                )
1100                .unwrap()
1101                .submit()
1102                .await
1103                .unwrap();
1104            file_api
1105                .create_file_from_string(
1106                    &"/second/index.html".try_into().unwrap(),
1107                    "This is a second page....".to_string(),
1108                )
1109                .unwrap()
1110                .submit()
1111                .await
1112                .unwrap();
1113
1114            tokio::spawn(async {
1115                println!("FILE API");
1116            });
1117
1118            /*
1119            // upload an artifact bundle
1120            {
1121                let mut file =
1122                    File::open("test-data/localhost-config/artifact-bundle.zip").unwrap();
1123                let mut data = vec![];
1124                file.read_to_end(&mut data).unwrap();
1125                let data = Arc::new(data);
1126                let artifact_bundle_api = starlane_api
1127                    .create_artifact_bundle(
1128                        &ArtifactBundleAddress::from_str("hyperspace:default:whiz:1.0.0").unwrap(),
1129                        data,
1130                    ).await
1131                    .unwrap()
1132                    .submit()
1133                    .await
1134                    .unwrap();
1135            }
1136             */
1137
1138            // upload an artifact bundle
1139            {
1140                let mut file =
1141                    File::open("test-data/localhost-config/artifact-bundle.zip").unwrap();
1142                let mut data = vec![];
1143                file.read_to_end(&mut data).unwrap();
1144                let data = Arc::new(data);
1145                //let artifact_bundle_path = "hyperspace:starlane:filo:1.0.0<ArtifactBundle>";
1146                let artifact_bundle_path = "hyperspace:starlane:filo:1.0.0<ArtifactBundle>";
1147                let artifact_bundle_path =
1148                    ArtifactBundlePath::from_str(artifact_bundle_path).unwrap();
1149                let artifact_bundle_api = starlane_api
1150                    .create_artifact_bundle(&artifact_bundle_path, data)
1151                    .await
1152                    .unwrap();
1153            }
1154
1155            let bundle: ResourceAddress = match ResourceAddress::from_str("hyperspace::<Space>") {
1156                Ok(ok) => ok,
1157                Err(error) => {
1158                    error!("error: {}", error.to_string());
1159                    panic!("cannot continue")
1160                }
1161            };
1162
1163            //            let bundle: ResourceAddress = ArtifactBundleAddress::from_str("hyperspace:default:filo:1.0.0").unwrap().into();
1164            let resources = starlane_api.list(&bundle.clone().into()).await.unwrap();
1165
1166            tokio::spawn(async move {
1167                println!(
1168                    "returned resources: {} from {}",
1169                    resources.len(),
1170                    bundle.to_string()
1171                );
1172                for resource in resources {
1173                    println!(
1174                        "{}\t{}",
1175                        resource.stub.key.to_string(),
1176                        resource.stub.address.to_string()
1177                    )
1178                }
1179            });
1180
1181            std::thread::sleep(std::time::Duration::from_secs(5));
1182
1183            client.shutdown();
1184            starlane.shutdown();
1185
1186            std::thread::sleep(std::time::Duration::from_secs(1));
1187        });
1188    }
1189
1190     */
1191}