pub fn spawn_admin_interface_task<A: InterfaceApi>(
handle: ListenerHandle,
listener: impl Stream<Item = ListenerItem> + Send + 'static,
api: A,
stop_rx: Receiver<()>
) -> InterfaceResult<JoinHandle<ManagedTaskResult>>
Expand description
Create an Admin Interface, which only receives AdminRequest messages from the external client
Examples found in repository?
src/conductor/conductor.rs (lines 394-399)
372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441
pub(crate) async fn add_admin_interfaces(
self: Arc<Self>,
configs: Vec<AdminInterfaceConfig>,
) -> ConductorResult<()> {
let admin_api = RealAdminInterfaceApi::new(self.clone());
let stop_tx = self.task_manager.share_ref(|tm| {
tm.as_ref()
.expect("Task manager not started yet")
.task_stop_broadcaster()
.clone()
});
// Closure to process each admin config item
let spawn_from_config = |AdminInterfaceConfig { driver, .. }| {
let admin_api = admin_api.clone();
let stop_tx = stop_tx.clone();
async move {
match driver {
InterfaceDriver::Websocket { port } => {
let (listener_handle, listener) =
spawn_websocket_listener(port).await?;
let port = listener_handle.local_addr().port().unwrap_or(port);
let handle: ManagedTaskHandle = spawn_admin_interface_task(
listener_handle,
listener,
admin_api.clone(),
stop_tx.subscribe(),
)?;
InterfaceResult::Ok((port, handle))
}
}
}
};
// spawn interface tasks, collect their JoinHandles,
// panic on errors.
let handles: Result<Vec<_>, _> =
future::join_all(configs.into_iter().map(spawn_from_config))
.await
.into_iter()
.collect();
// Exit if the admin interfaces fail to be created
let handles = handles.map_err(Box::new)?;
{
let mut ports = Vec::new();
// First, register the keepalive task, to ensure the conductor doesn't shut down
// in the absence of other "real" tasks
self.manage_task(ManagedTaskAdd::ignore(
tokio::spawn(keep_alive_task(stop_tx.subscribe())),
"keepalive task",
))
.await?;
// Now that tasks are spawned, register them with the TaskManager
for (port, handle) in handles {
ports.push(port);
self.manage_task(ManagedTaskAdd::ignore(
handle,
&format!("admin interface, port {}", port),
))
.await?
}
for p in ports {
self.add_admin_port(p);
}
}
Ok(())
}