pub async fn spawn_app_interface_task<A: InterfaceApi>(
    port: u16,
    api: A,
    signal_broadcaster: Sender<Signal>,
    stop_rx: Receiver<()>
) -> InterfaceResult<(u16, JoinHandle<ManagedTaskResult>)>
Expand description

Create an App Interface, which includes the ability to receive signals from Cells via a broadcast channel

Examples found in repository?
src/conductor/conductor.rs (line 467)
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
        pub async fn add_app_interface(
            self: Arc<Self>,
            port: either::Either<u16, AppInterfaceId>,
        ) -> ConductorResult<u16> {
            let interface_id = match port {
                either::Either::Left(port) => AppInterfaceId::new(port),
                either::Either::Right(id) => id,
            };
            let port = interface_id.port();
            tracing::debug!("Attaching interface {}", port);
            let app_api = RealAppInterfaceApi::new(self.clone());
            // This receiver is thrown away because we can produce infinite new
            // receivers from the Sender
            let (signal_tx, _r) = tokio::sync::broadcast::channel(SIGNAL_BUFFER_SIZE);
            let stop_rx = self.task_manager.share_ref(|tm| {
                tm.as_ref()
                    .expect("Task manager not initialized")
                    .task_stop_broadcaster()
                    .subscribe()
            });
            let (port, task) = spawn_app_interface_task(port, app_api, signal_tx.clone(), stop_rx)
                .await
                .map_err(Box::new)?;
            // TODO: RELIABILITY: Handle this task by restarting it if it fails and log the error
            self.manage_task(ManagedTaskAdd::ignore(
                task,
                &format!("app interface, port {}", port),
            ))
            .await?;
            let interface = AppInterfaceRuntime::Websocket { signal_tx };

            self.app_interfaces.share_mut(|app_interfaces| {
                if app_interfaces.contains_key(&interface_id) {
                    return Err(ConductorError::AppInterfaceIdCollision(
                        interface_id.clone(),
                    ));
                }

                app_interfaces.insert(interface_id.clone(), interface);
                Ok(())
            })?;
            let config = AppInterfaceConfig::websocket(port);
            self.update_state(|mut state| {
                state.app_interfaces.insert(interface_id, config);
                Ok(state)
            })
            .await?;
            tracing::debug!("App interface added at port: {}", port);
            Ok(port)
        }