1use std::cell::Cell;
2
3use std::collections::HashMap;
4
5use std::sync::{Arc, Mutex};
6
7use std::time::Duration;
8
9use futures::future::join_all;
10use futures::{FutureExt, StreamExt};
11
12use serde::{Deserialize, Serialize};
13use tokio::net::{TcpListener, TcpStream};
14use tokio::sync::oneshot;
15use tokio::sync::{broadcast, mpsc};
16
17use crate::cache::ProtoArtifactCachesFactory;
18use crate::constellation::{Constellation, ConstellationStatus};
19use crate::error::Error;
20use crate::file_access::FileAccess;
21
22use crate::lane::{ClientSideTunnelConnector, LocalTunnelConnector, ProtoLaneEnd, ServerSideTunnelConnector, OnCloseAction};
23use crate::logger::{Flags, Logger};
24
25use crate::proto::{
26 local_tunnels, ProtoStar, ProtoStarController, ProtoStarEvolution, ProtoTunnel,
27};
28
29use starlane_resources::data::BinContext;
30use crate::star::surface::SurfaceApi;
31use crate::star::{ConstellationBroadcast, StarKind, StarStatus};
32use crate::star::{Request, Star, StarCommand, StarController, StarInfo, StarKey, StarTemplateId};
33use crate::starlane::api::StarlaneApi;
34use crate::starlane::files::MachineFileSystem;
35use crate::template::{
36 ConstellationData, ConstellationLayout, ConstellationSelector, ConstellationTemplate,
37 ConstellationTemplateHandle, MachineName, StarInConstellationTemplateHandle,
38 StarInConstellationTemplateSelector, StarKeyConstellationIndexTemplate,
39 StarKeySubgraphTemplate, StarKeyTemplate, StarSelector, StarTemplate, StarTemplateHandle,
40};
41use crate::util::AsyncHashMap;
42
43pub mod api;
44pub mod files;
45
46lazy_static! {
47pub static ref DEFAULT_PORT: usize = std::env::var("STARLANE_PORT").unwrap_or("4343".to_string()).parse::<usize>().unwrap_or(4343);
49
50 pub static ref VERSION: VersionFrame = VersionFrame{ product: "Starlane".to_string(), version: "1.0.0".to_string() };
51}
52
53#[derive(Clone)]
54pub struct StarlaneMachine {
55 tx: mpsc::Sender<StarlaneCommand>,
56 run_complete_signal_tx: broadcast::Sender<()>,
57 machine_filesystem: Arc<MachineFileSystem>,
58}
59
60impl StarlaneMachine {
61 pub fn new(name: MachineName) -> Result<Self, Error> {
62 Self::new_with_artifact_caches(name, Option::None)
63 }
64
65 pub fn new_with_artifact_caches(
66 name: MachineName,
67 artifact_caches: Option<Arc<ProtoArtifactCachesFactory>>,
68 ) -> Result<Self, Error> {
69 let runner = StarlaneMachineRunner::new_with_artifact_caches(name, artifact_caches)?;
70 let tx = runner.command_tx.clone();
71 let run_complete_signal_tx = runner.run();
72 let starlane = Self {
73 tx: tx,
74 run_complete_signal_tx: run_complete_signal_tx,
75 machine_filesystem: Arc::new(MachineFileSystem::new()?),
76 };
77
78 Result::Ok(starlane)
79 }
80
81 pub async fn get_proto_artifact_caches_factory(
82 &self,
83 ) -> Result<Arc<ProtoArtifactCachesFactory>,Error> {
84 let (tx, rx) = oneshot::channel();
85 self.tx
86 .send(StarlaneCommand::GetProtoArtifactCachesFactory(tx))
87 .await?;
88 Ok(rx.await?.ok_or("expected proto artifact cache")?)
89
90 }
91
92 pub fn bin_context(&self) -> Arc<dyn BinContext> {
93 self.machine_filesystem.clone()
94 }
95
96 pub fn machine_filesystem(&self) -> Arc<MachineFileSystem> {
97 self.machine_filesystem.clone()
98 }
99
100 pub fn shutdown(&self) {
101 let tx = self.tx.clone();
102 tokio::spawn(async move {
103 tx.send(StarlaneCommand::Shutdown).await;
104 });
105 }
106
107 pub async fn create_constellation(
108 &self,
109 name: &str,
110 layout: ConstellationLayout,
111 ) -> Result<(), Error> {
112 let name = name.to_string();
113 let (tx, rx) = oneshot::channel();
114 let create = ConstellationCreate {
115 name,
116 layout,
117 tx,
118 machine: self.clone(),
119 };
120
121 self.tx
122 .send(StarlaneCommand::ConstellationCreate(create))
123 .await?;
124 rx.await?
125 }
126
127 pub async fn get_starlane_api(&self) -> Result<StarlaneApi, Error> {
128 let (tx, rx) = oneshot::channel();
129 self.tx
130 .send(StarlaneCommand::StarlaneApiSelectBest(tx))
131 .await?;
132 rx.await?
133 }
134
135 pub async fn listen(&self) -> Result<(), Error> {
136 let command_tx = self.tx.clone();
137 let (tx, rx) = oneshot::channel();
138 command_tx.send(StarlaneCommand::Listen(tx)).await;
139 rx.await?
140 }
141
142 pub async fn join(self) {
143 let mut run_complete_signal_rx = self.run_complete_signal_tx.subscribe();
144 join!(run_complete_signal_rx.recv());
145 }
146}
147
148pub struct StarlaneMachineRunner {
149 name: MachineName,
150 pub command_tx: mpsc::Sender<StarlaneCommand>,
151 command_rx: mpsc::Receiver<StarlaneCommand>,
152 star_controllers: AsyncHashMap<StarInConstellationTemplateHandle, StarController>,
153 data_access: FileAccess,
155 cache_access: FileAccess,
156 pub logger: Logger,
157 pub flags: Flags,
158 pub artifact_caches: Option<Arc<ProtoArtifactCachesFactory>>,
159 constellations: HashMap<String, Constellation>,
160 port: usize,
161 inner_flags: Arc<Mutex<Cell<StarlaneInnerFlags>>>,
162}
163
164impl StarlaneMachineRunner {
165 pub fn new(machine: String) -> Result<Self, Error> {
166 Self::new_with_artifact_caches(machine, Option::None)
167 }
168
169 pub fn new_with_artifact_caches(
170 machine: String,
171 artifact_caches: Option<Arc<ProtoArtifactCachesFactory>>,
172 ) -> Result<Self, Error> {
173 let (command_tx, command_rx) = mpsc::channel(32);
174 Ok(StarlaneMachineRunner {
175 name: machine,
176 star_controllers: AsyncHashMap::new(),
177 command_tx,
178 command_rx,
179 logger: Logger::new(),
181 flags: Flags::new(),
182 data_access: FileAccess::new(
183 std::env::var("STARLANE_DATA").unwrap_or("data".to_string()),
184 )?,
185 cache_access: FileAccess::new(
186 std::env::var("STARLANE_CACHE").unwrap_or("cache".to_string()),
187 )?,
188 artifact_caches: artifact_caches,
189 port: DEFAULT_PORT.clone(),
190 constellations: HashMap::new(),
191 inner_flags: Arc::new(Mutex::new(Cell::new(StarlaneInnerFlags::new()))),
192 })
193 }
194
195 pub fn run(mut self) -> broadcast::Sender<()> {
196 let command_tx = self.command_tx.clone();
197 tokio::spawn(async move {
198 let mut shutdown_rx = crate::util::SHUTDOWN_TX.subscribe();
199 shutdown_rx.recv().await;
200 command_tx.try_send(StarlaneCommand::Shutdown);
201 });
202
203 let (run_complete_signal_tx, _) = broadcast::channel(1);
204 let run_complete_signal_tx_rtn = run_complete_signal_tx.clone();
205
206 tokio::spawn(async move {
207 while let Option::Some(command) = self.command_rx.recv().await {
208 match command {
209 StarlaneCommand::ConstellationCreate(command) => {
210 let result = self
211 .constellation_create(command.layout, command.name, command.machine)
212 .await;
213
214 if let Err(error) = &result {
216 error!("CONSTELLATION CREATE ERROR: {}", error.to_string());
217 }
218 command.tx.send(result);
219 }
220 StarlaneCommand::StarlaneApiSelectBest(tx) => {
221 let map = match self.star_controllers.clone().into_map().await {
222 Ok(map) => map,
223 Err(err) => {
224 tx.send(Err(err));
225 continue;
226 }
227 };
228 if map.is_empty() {
229 tx.send(Err(
230 "ERROR: cannot create StarlaneApi: no StarControllers available."
231 .into(),
232 ));
233 continue;
234 }
235 let values: Vec<StarController> =
236 map.into_iter().map(|(_k, v)| v).collect();
237
238 let mut best = Option::None;
239
240 for star_ctrl in values {
241 let info = star_ctrl.get_star_info().await.unwrap().unwrap();
242 if best.is_none() {
243 best = Option::Some((info, star_ctrl));
244 } else {
245 let (prev_info, _) = best.as_ref().unwrap();
246 match info.kind {
247 StarKind::Mesh => {
248 best = Option::Some((info, star_ctrl));
249 }
250 StarKind::Client => {
251 if prev_info.kind != StarKind::Mesh {
252 best = Option::Some((info, star_ctrl));
253 }
254 }
255 _ => {}
256 }
257 }
258 }
259
260 let (_info, star_ctrl) = best.unwrap();
261
262 tx.send(Ok(StarlaneApi::with_starlane_ctrl(
263 star_ctrl.surface_api,
264 self.command_tx.clone(),
265 )));
266 }
267 StarlaneCommand::Shutdown => {
268 let listening = {
269 let mut inner_flags = self.inner_flags.lock().unwrap();
270 let mut_flags = inner_flags.get_mut();
271 mut_flags.shutdown = true;
272 mut_flags.listening
273 };
274
275 if listening {
276 Self::unlisten(self.inner_flags.clone(), self.port.clone());
277 }
278
279 for (_, star_ctrl) in self
280 .star_controllers
281 .clone()
282 .into_map()
283 .await
284 .unwrap_or(Default::default())
285 {
286 star_ctrl.star_tx.try_send(StarCommand::Shutdown);
287 }
288
289 self.star_controllers.clear();
290 self.command_rx.close();
291 break;
292 }
293 StarlaneCommand::Listen(tx) => {
294 self.listen(tx);
295 }
296 StarlaneCommand::AddStream(stream) => {
297 match self.select_star_kind(&StarKind::Gateway).await {
298 Ok(Option::Some(star_ctrl)) => {
299 match self.add_server_side_lane_ctrl(star_ctrl, stream,OnCloseAction::Remove).await {
300 Ok(_result) => {}
301 Err(error) => {
302 error!("{}", error);
303 }
304 }
305 }
306 Ok(Option::None) => {
307 error!("cannot find StarController for kind: StarKind::Gateway");
308 }
309 Err(err) => {
310 error!("{}", err);
311 }
312 }
313 }
314 StarlaneCommand::GetProtoArtifactCachesFactory(tx) => {
315 match self.artifact_caches.as_ref() {
316 None => {
317 tx.send(Option::None);
318 }
319 Some(caches) => {
320 tx.send(Option::Some(caches.clone()));
321 }
322 }
323 }
324 }
325 }
326 run_complete_signal_tx.send(());
327 });
328 run_complete_signal_tx_rtn
329 }
330
331 async fn select_star_kind(&self, kind: &StarKind) -> Result<Option<StarController>, Error> {
332 let map = self.star_controllers.clone().into_map().await?;
333 let values: Vec<StarController> = map.into_iter().map(|(_k, v)| v).collect();
334
335 for star_ctrl in values {
336 let info = star_ctrl
337 .get_star_info()
338 .await?
339 .ok_or("expected StarInfo")?;
340 if info.kind == *kind {
341 return Ok(Option::Some(star_ctrl));
342 }
343 }
344
345 Ok(Option::None)
346 }
347
348 async fn constellation_create(
349 &mut self,
350 layout: ConstellationLayout,
351 name: String,
352 starlane_machine: StarlaneMachine,
353 ) -> Result<(), Error> {
354 if self.constellations.contains_key(&name) {
355 return Err(format!(
356 "constellation named '{}' already exists in this StarlaneMachine.",
357 name
358 )
359 .into());
360 }
361
362 let mut constellation = Constellation::new(name.clone());
363 let mut evolve_rxs = vec![];
364 let (constellation_broadcaster, _) = broadcast::channel(16);
365
366 for star_template in layout.template.stars.clone() {
367 constellation.stars.push(star_template.clone());
368
369 let star_template_id =
370 StarInConstellationTemplateHandle::new(name.clone(), star_template.handle.clone());
371
372 let machine = layout
373 .handles_to_machine
374 .get(&star_template.handle)
375 .ok_or(format!(
376 "expected machine mapping for star template handle: {}",
377 star_template.handle.to_string()
378 ))?;
379 if self.name == *machine {
380 let star_key = star_template.key.create();
381
382 let (evolve_tx, evolve_rx) = oneshot::channel();
383 evolve_rxs.push(evolve_rx);
384
385 let (star_tx, star_rx) = mpsc::channel(1024);
386 let (surface_tx, surface_rx) = mpsc::channel(1024);
387 let surface_api = SurfaceApi::new(surface_tx);
388
389 let star_ctrl = StarController {
390 star_tx: star_tx.clone(),
391 surface_api: surface_api.clone(),
392 };
393 self.star_controllers.put(star_template_id, star_ctrl).await;
394
395 if self.artifact_caches.is_none() {
396 let api = StarlaneApi::new(surface_api.clone());
397 let caches = Arc::new(ProtoArtifactCachesFactory::new(
398 api.into(),
399 self.cache_access.clone(),
400 starlane_machine.clone()
401 )?);
402 self.artifact_caches = Option::Some(caches);
403 }
404
405 let (proto_star, _star_ctrl) = ProtoStar::new(
406 star_key.clone(),
407 star_template.kind.clone(),
408 star_tx.clone(),
409 star_rx,
410 surface_api,
411 surface_rx,
412 self.data_access.clone(),
413 constellation_broadcaster.subscribe(),
414 self.flags.clone(),
415 self.logger.clone(),
416 starlane_machine.clone(),
417 );
418
419 tokio::spawn(async move {
420 let star = proto_star.evolve().await;
421 if let Ok(star) = star {
422 let key = star.star_key().clone();
423
424 let star_tx = star.star_tx();
425 let surface_api = star.surface_api();
426 tokio::spawn(async move {
427 star.run().await;
428 });
429 evolve_tx.send(ProtoStarEvolution {
430 star: key.clone(),
431 controller: StarController {
432 star_tx,
433 surface_api,
434 },
435 });
436 } else {
445 eprintln!("experienced serious error could not evolve the proto_star");
446 }
447 });
448 } else {
449 println!(
450 "skipping star not hosted on this machine: {}",
451 star_template.handle.to_string()
452 )
453 }
454 }
455
456 let mut proto_lane_evolution_rxs = vec![];
458 for star_template in &layout.template.stars {
459 let machine = layout
460 .handles_to_machine
461 .get(&star_template.handle)
462 .ok_or(format!(
463 "expected machine mapping for star template handle: {}",
464 star_template.handle.to_string()
465 ))?;
466 let local_star =
467 StarInConstellationTemplateHandle::new(name.clone(), star_template.handle.clone());
468 if self.name == *machine {
469 for lane in &star_template.lanes {
470 match &lane.star_selector.constellation {
471 ConstellationSelector::Local => {
472 let second_star = constellation
473 .select(lane.star_selector.star.clone())
474 .ok_or("cannot select star from local constellation")?
475 .clone();
476 let second_star = StarInConstellationTemplateHandle::new(
477 name.clone(),
478 second_star.handle,
479 );
480 let mut evolution_rxs =
481 self.add_local_lane(local_star.clone(), second_star).await?;
482 proto_lane_evolution_rxs.append(&mut evolution_rxs);
483 }
484 ConstellationSelector::Named(constellation_name) => {
485 let constellation = self
486 .constellations
487 .get(constellation_name)
488 .ok_or(format!(
489 "cannot select constellation named '{}' on this StarlaneMachine",
490 constellation_name
491 ))?;
492 let second_star = constellation
493 .select(lane.star_selector.star.clone())
494 .ok_or(format!(
495 "cannot select star from constellation {}",
496 constellation_name
497 ))?
498 .clone();
499 let second_star = StarInConstellationTemplateHandle::new(
500 constellation.name.clone(),
501 second_star.handle,
502 );
503 let mut evolution_rxs =
504 self.add_local_lane(local_star.clone(), second_star).await?;
505 proto_lane_evolution_rxs.append(&mut evolution_rxs);
506 }
507 ConstellationSelector::AnyWithGatewayInsideMachine(machine_name) => {
508 let host_address =
509 layout.get_machine_host_adddress(machine_name.clone());
510 let star_ctrl = self
511 .star_controllers
512 .get(local_star.clone())
513 .await?
514 .ok_or("expected local star to have star_ctrl")?;
515 let proto_lane_evolution_rx = self
516 .add_client_side_lane_ctrl(
517 star_ctrl,
518 host_address,
519 lane.star_selector.clone(),
520 true,
521 OnCloseAction::Remove
522 )
523 .await?;
524 proto_lane_evolution_rxs.push(proto_lane_evolution_rx);
525 }
526 }
527 }
528 }
529 }
530
531 let proto_lane_evolutions =
532 join_all(proto_lane_evolution_rxs.iter_mut().map(|x| x.recv())).await;
533
534 for result in proto_lane_evolutions {
535 result??;
536 }
537
538 constellation_broadcaster.send(ConstellationBroadcast::Status(
540 ConstellationStatus::Assembled,
541 ));
542
543 let evolutions = join_all(evolve_rxs).await;
544
545 for evolve in evolutions {
546 if let Ok(evolve) = evolve {
547 evolve.controller.surface_api.init();
548 } else if let Err(error) = evolve {
549 return Err(error.to_string().into());
550 }
551 }
552
553 let mut ready_futures = vec![];
554 for star_template in &layout.template.stars {
555 let machine = layout
556 .handles_to_machine
557 .get(&star_template.handle)
558 .ok_or(format!(
559 "expected machine mapping for star template handle: {}",
560 star_template.handle.to_string()
561 ))?;
562 if self.name == *machine {
563 let local_star = StarInConstellationTemplateHandle::new(
564 name.clone(),
565 star_template.handle.clone(),
566 );
567 let star_ctrl =
568 self.star_controllers
569 .get(local_star.clone())
570 .await?
571 .ok_or(format!(
572 "expected star controller: {}",
573 local_star.to_string()
574 ))?;
575 let (tx, rx) = oneshot::channel();
576 star_ctrl
577 .star_tx
578 .send(StarCommand::GetStatusListener(tx))
579 .await;
580 let mut star_status_receiver = rx.await?;
581 let (ready_status_tx, ready_status_rx) = oneshot::channel();
582 tokio::spawn(async move {
583 while let Result::Ok(status) = star_status_receiver.recv().await {
584 if status == StarStatus::Ready {
585 ready_status_tx.send(());
586 break;
587 }
588 }
589 });
590 ready_futures.push(ready_status_rx);
591 }
592 }
593
594 join_all(ready_futures).await;
596
597 Ok(())
598 }
599
600 fn listen(&mut self, result_tx: oneshot::Sender<Result<(), Error>>) {
601 {
602 let mut inner_flags = self.inner_flags.lock().unwrap();
603 let flags = inner_flags.get_mut();
604
605 if flags.listening {
606 result_tx.send(Ok(()));
607 return;
608 }
609 flags.listening = true;
610 }
611
612 {
613 let _port = self.port.clone();
614 let _inner_flags = self.inner_flags.clone();
615
616 }
621
622 let port = self.port.clone();
623 let command_tx = self.command_tx.clone();
624 let flags = self.inner_flags.clone();
625 tokio::spawn(async move {
626 match std::net::TcpListener::bind(format!("127.0.0.1:{}", port)) {
627 Ok(std_listener) => {
628 let listener = TcpListener::from_std(std_listener).unwrap();
629 result_tx.send(Ok(()));
630 while let Ok((stream, _)) = listener.accept().await {
631 {
632 let mut flags = flags.lock().unwrap();
633 let flags = flags.get_mut();
634 if flags.shutdown {
635 drop(listener);
636 return;
637 }
638 }
639 let _ok = command_tx
640 .send(StarlaneCommand::AddStream(stream))
641 .await
642 .is_ok();
643 tokio::time::sleep(Duration::from_secs(0)).await;
644 }
645 }
646 Err(error) => {
647 error!("FATAL: could not setup TcpListener {}", error);
648 result_tx.send(Err(error.into()));
649 }
650 }
651 });
652 }
653
654 pub fn caches(&self) -> Result<Arc<ProtoArtifactCachesFactory>, Error> {
655 Ok(self
656 .artifact_caches
657 .as_ref()
658 .ok_or("expected caches to be set")?
659 .clone())
660 }
661
662 async fn add_local_lane(
663 &mut self,
664 local: StarInConstellationTemplateHandle,
665 second: StarInConstellationTemplateHandle,
666 ) -> Result<Vec<broadcast::Receiver<Result<(), Error>>>, Error> {
667 let (high, low) = crate::util::sort(local, second)?;
668
669 let high_star_ctrl = {
670 let high_star_ctrl = self.star_controllers.get(high.clone()).await?;
671 match high_star_ctrl {
672 None => {
673 return Err(format!(
674 "lane cannot construct. missing local star key: {}",
675 high.star.to_string()
676 )
677 .into());
678 }
679 Some(high_star_ctrl) => high_star_ctrl.clone(),
680 }
681 };
682
683 let low_star_ctrl = {
684 let low_star_ctrl = self.star_controllers.get(low.clone()).await?;
685 match low_star_ctrl {
686 None => {
687 return Err(format!(
688 "lane cannot construct. missing second star key: {}",
689 low.star.to_string()
690 )
691 .into());
692 }
693 Some(low_star_ctrl) => low_star_ctrl.clone(),
694 }
695 };
696 self.add_local_lane_ctrl(high_star_ctrl, low_star_ctrl)
697 .await
698 }
699
700 async fn add_local_lane_ctrl(
701 &mut self,
702 high_star_ctrl: StarController,
703 low_star_ctrl: StarController,
704 ) -> Result<Vec<broadcast::Receiver<Result<(), Error>>>, Error> {
705 let high_lane = ProtoLaneEnd::new(Option::None, OnCloseAction::Remove );
706 let low_lane = ProtoLaneEnd::new(Option::None, OnCloseAction::Remove );
707 let rtn = vec![high_lane.get_evoltion_rx(), low_lane.get_evoltion_rx()];
708 let connector = LocalTunnelConnector::new(&high_lane, &low_lane).await?;
709 high_star_ctrl
710 .star_tx
711 .send(StarCommand::AddProtoLaneEndpoint(high_lane))
712 .await?;
713 low_star_ctrl
714 .star_tx
715 .send(StarCommand::AddProtoLaneEndpoint(low_lane))
716 .await?;
717 high_star_ctrl
718 .star_tx
719 .send(StarCommand::AddConnectorController(connector))
720 .await?;
721
722 Ok(rtn)
723 }
724
725 async fn add_server_side_lane_ctrl(
726 &mut self,
727 low_star_ctrl: StarController,
728 stream: TcpStream,
729 on_close_action: OnCloseAction
730 ) -> Result<broadcast::Receiver<Result<(), Error>>, Error> {
731 let low_lane = ProtoLaneEnd::new(Option::None, on_close_action );
732 let rtn = low_lane.get_evoltion_rx();
733
734 let connector_ctrl = ServerSideTunnelConnector::new(&low_lane, stream).await?;
735
736 low_star_ctrl
737 .star_tx
738 .send(StarCommand::AddProtoLaneEndpoint(low_lane))
739 .await?;
740
741 low_star_ctrl
742 .star_tx
743 .send(StarCommand::AddConnectorController(connector_ctrl))
744 .await?;
745
746 Ok(rtn)
747 }
748
749 async fn add_client_side_lane_ctrl(
750 &mut self,
751 star_ctrl: StarController,
752 host_address: String,
753 selector: StarInConstellationTemplateSelector,
754 key_requestor: bool,
755 on_close_action: OnCloseAction
756
757 ) -> Result<broadcast::Receiver<Result<(), Error>>, Error> {
758 let mut lane = ProtoLaneEnd::new(Option::None, on_close_action);
759 lane.key_requestor = key_requestor;
760
761 let rtn = lane.get_evoltion_rx();
762
763 let connector = ClientSideTunnelConnector::new(&lane, host_address, selector).await?;
764
765 star_ctrl
766 .star_tx
767 .send(StarCommand::AddProtoLaneEndpoint(lane))
768 .await?;
769
770 star_ctrl
771 .star_tx
772 .send(StarCommand::AddConnectorController(connector))
773 .await?;
774
775 Ok(rtn)
776 }
777
778 fn unlisten(inner_flags: Arc<Mutex<Cell<StarlaneInnerFlags>>>, port: usize) {
779 {
780 let mut flags = inner_flags.lock().unwrap();
781 flags.get_mut().shutdown = true;
782 }
783 std::net::TcpStream::connect(format!("localhost:{}", port));
784 }
785}
786
787impl Drop for StarlaneMachineRunner {
788 fn drop(&mut self) {
789 {
790 let mut flags = self.inner_flags.lock().unwrap();
791
792 let flags_mut = flags.get_mut();
793
794 if !flags_mut.shutdown {
795 warn!("dropping Starlane( {} ) unexpectedly", self.name);
796 }
797
798 if !flags_mut.listening {
799 Self::unlisten(self.inner_flags.clone(), self.port.clone());
800 }
801 }
802 }
803}
804
805#[derive(Clone, Serialize, Deserialize)]
806pub struct VersionFrame {
807 product: String,
808 version: String,
809}
810
811#[derive(strum_macros::Display)]
812pub enum StarlaneCommand {
813 ConstellationCreate(ConstellationCreate),
814 StarlaneApiSelectBest(oneshot::Sender<Result<StarlaneApi, Error>>),
815 Listen(oneshot::Sender<Result<(), Error>>),
816 AddStream(TcpStream),
817 GetProtoArtifactCachesFactory(oneshot::Sender<Option<Arc<ProtoArtifactCachesFactory>>>),
818 Shutdown,
819}
820
821pub struct StarlaneApiRequestByKey {
822 pub star: StarKey,
823 pub tx: oneshot::Sender<StarlaneApi>,
824}
825
826pub struct StarlaneApiRequest {
827 pub selector: StarSelector,
828 pub tx: oneshot::Sender<StarlaneApi>,
829}
830
831impl StarlaneApiRequest {
832 pub fn new(selector: StarSelector) -> (Self, oneshot::Receiver<StarlaneApi>) {
833 let (tx, rx) = oneshot::channel();
834 (
835 Self {
836 selector: selector,
837 tx: tx,
838 },
839 rx,
840 )
841 }
842}
843
844pub struct ConstellationCreate {
845 name: String,
846 layout: ConstellationLayout,
847 tx: oneshot::Sender<Result<(), Error>>,
848 machine: StarlaneMachine,
849}
850
851impl ConstellationCreate {
852 pub fn new(
853 layout: ConstellationLayout,
854 name: String,
855 machine: StarlaneMachine,
856 ) -> (Self, oneshot::Receiver<Result<(), Error>>) {
857 let (tx, rx) = oneshot::channel();
858 (
859 ConstellationCreate {
860 name: name,
861 layout: layout,
862 tx: tx,
863 machine: machine,
864 },
865 rx,
866 )
867 }
868}
869
870pub enum StarAddress {
871 Local,
872}
873
874#[derive(Clone)]
875struct StarlaneInnerFlags {
876 pub shutdown: bool,
877 pub listening: bool,
878}
879
880impl StarlaneInnerFlags {
881 pub fn new() -> Self {
882 Self {
883 shutdown: false,
884 listening: false,
885 }
886 }
887}
888
889#[cfg(test)]
890mod test {
891 use std::convert::TryInto;
892 use std::fs;
893 use std::fs::File;
894 use std::io::Read;
895 use std::str::FromStr;
896 use std::sync::Arc;
897
898 use starlane_resources::ArtifactBundlePath;
899 use tokio::runtime::Runtime;
900 use tokio::sync::oneshot;
901 use tokio::sync::oneshot::error::RecvError;
902 use tokio::time::timeout;
903 use tokio::time::Duration;
904 use tracing::dispatcher::set_global_default;
905 use tracing_subscriber::FmtSubscriber;
906
907 use crate::artifact::ArtifactLocation;
908 use crate::error::Error;
909 use crate::logger::{
910 Flag, Flags, Log, LogAggregate, ProtoStarLog, ProtoStarLogPayload, StarFlag, StarLog,
911 StarLogPayload,
912 };
913 use starlane_resources::message::Fail;
914 use crate::names::Name;
915 use crate::permissions::Authentication;
916 use crate::resource::ArtifactBundleAddress;
917 use crate::resource::{ResourceAddress};
918 use crate::space::CreateAppControllerFail;
919 use crate::star::{StarController, StarInfo, StarKey, StarKind};
920 use crate::starlane::api::SubSpaceApi;
921 use crate::starlane::{
922 ConstellationCreate, StarlaneApiRequest, StarlaneCommand, StarlaneMachine,
923 StarlaneMachineRunner,
924 };
925 use crate::template::{ConstellationLayout, ConstellationTemplate};
926
927 #[test]
928 #[instrument]
929 pub fn tracing() {
930 let subscriber = FmtSubscriber::default();
931 set_global_default(subscriber.into()).expect("setting global default failed");
932 info!("tracing works!");
933 }
934
935
936 #[test]
937 pub fn mechtron() {
938println!("Mechtron..");
939 let subscriber = FmtSubscriber::default();
940 set_global_default(subscriber.into()).expect("setting global default failed");
941
942 let data_dir = "tmp/data";
943 let cache_dir = "tmp/cache";
944 fs::remove_dir_all(data_dir).unwrap_or_default();
945 fs::remove_dir_all(cache_dir).unwrap_or_default();
946 std::env::set_var("STARLANE_DATA", data_dir);
947 std::env::set_var("STARLANE_CACHE", cache_dir);
948
949 println!("Hello");
950 let rt = Runtime::new().unwrap();
951 rt.block_on(async {
952println!("block ON..");
953 async fn test() -> Result<(),Error> {
954 let mut starlane = StarlaneMachine::new("server".to_string()).unwrap();
955
956 starlane.listen().await.unwrap();
957
958 starlane.create_constellation("standalone", ConstellationLayout::standalone().unwrap()) .await?;
959
960println!("POST CREATE CONSTELLATION");
961
962 let starlane_api = starlane.get_starlane_api().await.unwrap();
963
964 let sub_space_api = starlane_api.get_sub_space( ResourceAddress::from_str("hyperspace:starlane<SubSpace>").unwrap() .into(), ) .await?;
965
966 {
967 let mut file =
968 File::open("../../wasm/appy/appy.zip").unwrap();
969 let mut data = vec![];
970 file.read_to_end(&mut data).unwrap();
971 let address = ResourceAddress::from_str("hyperspace:starlane:appy:1.0.0<ArtifactBundle>")?;
972 let mut creation = sub_space_api
973 .create_artifact_bundle_versions(address.parent().unwrap().name().as_str())?;
974 let artifact_bundle_versions_api = creation.submit().await?;
975
976 let version = semver::Version::from_str(address.name().as_str())?;
977 let mut creation = artifact_bundle_versions_api.create_artifact_bundle(
978 version,
979 Arc::new(data),
980 )?;
981 creation.submit().await?;
982 }
983println!("appy bundle published");
984
985
986
987 let config = ResourceAddress::from_str("hyperspace:starlane:appy:1.0.0:/app/appy-config.yaml<Artifact>")?;
988 let app_api = sub_space_api.create_app("appy", config.try_into()? )?.submit().await?;
989
990println!("app created: {}", app_api.key().to_string() );
991
992 std::thread::sleep(std::time::Duration::from_secs(10));
993
994 starlane.shutdown();
995
996 std::thread::sleep(std::time::Duration::from_secs(1));
997
998 Ok(())
999 }
1000 match test().await {
1001 Ok(_) => {}
1002 Err(error) => {
1003 eprintln!("{}",error.to_string());
1004 assert!(false);
1005 }
1006 }
1007 });
1008 }
1009
1010 }