use tokio::{
select,
sync::{broadcast::Sender, mpsc::UnboundedReceiver},
task::{self, spawn_blocking},
};
use crate::{
dbus_api::create_dbus_handlers,
engine::{LockableEngine, UdevEngineEvent},
stratis::{StratisError, StratisResult},
};
pub async fn setup(
engine: LockableEngine,
receiver: UnboundedReceiver<UdevEngineEvent>,
trigger: Sender<()>,
) -> StratisResult<()> {
let (mut conn, mut udev, mut tree) = spawn_blocking(move || {
create_dbus_handlers(engine.clone(), receiver, trigger)
.map(|(conn, udev, tree)| {
let mutex_lock = engine.blocking_lock();
for (pool_name, pool_uuid, pool) in mutex_lock.pools() {
udev.register_pool(&pool_name, pool_uuid, pool)
}
info!("D-Bus API is available");
(conn, udev, tree)
})
.map_err(StratisError::from)
})
.await
.map_err(StratisError::from)
.and_then(|res| res)?;
let mut tree_handle = task::spawn_blocking(move || {
if let Err(e) = tree.process_dbus_actions() {
error!(
"Failed to process D-Bus object path addition or removal: {}; \
exiting D-Bus thread",
e,
);
return;
}
});
let mut conn_handle = task::spawn_blocking(move || conn.process_dbus_requests());
let mut udev_handle = task::spawn(async move {
loop {
if let Err(e) = udev.handle_udev_event().await {
error!(
"Failed to process udev event in the D-Bus layer: {}; exiting D-Bus thread",
e,
);
return;
}
}
});
select! {
res = &mut tree_handle => {
error!("The tree handling thread exited...");
res.map_err(|e| StratisError::Error(e.to_string()))
}
res = &mut conn_handle => {
error!("The D-Bus request thread exited...");
res.map_err(|e| StratisError::Error(e.to_string())).and_then(|res| res)
}
res = &mut udev_handle => {
error!("The udev processing thread exited...");
res.map_err(|e| StratisError::Error(e.to_string()))
}
}
}