#![allow(dead_code)]
use std::sync::OnceLock;
use base::{
consts::{MAX_HEART_BEAT_ELAPSE, MIN_HEART_BEAT_ELAPSE},
enums::UnitDBusLevel,
proxy::{DisEnAbleUnitFiles, DisEnAbleUnitFilesResponse},
};
use futures_util::stream::StreamExt;
use tokio::{
task::JoinHandle,
time::{self, Duration},
};
use tracing::{debug, info, warn};
use zbus::proxy;
use zvariant::OwnedObjectPath;
use crate::{
errors::SystemdErrors,
sysdbus::{get_blocking_connection, get_connection, run_context},
};
#[proxy(
interface = "io.github.plrigaux.SysDManager",
default_service = "io.github.plrigaux.SysDManager",
default_path = "/io/github/plrigaux/SysDManager"
)]
pub trait SysDManagerComLink {
fn start_unit(&self, unit_name: &str, mode: &str) -> zbus::fdo::Result<OwnedObjectPath>;
fn stop_unit(&self, unit_name: &str, mode: &str) -> zbus::fdo::Result<OwnedObjectPath>;
fn restart_unit(&self, unit_name: &str, mode: &str) -> zbus::fdo::Result<OwnedObjectPath>;
fn reload_unit(&self, unit_name: &str, mode: &str) -> zbus::fdo::Result<OwnedObjectPath>;
fn clean_unit(&self, unit_name: &str, what: &[&str]) -> zbus::Result<()>;
fn freeze_unit(&self, unit_name: &str) -> zbus::fdo::Result<()>;
fn thaw_unit(&self, unit_name: &str) -> zbus::fdo::Result<()>;
fn reload(&self) -> zbus::fdo::Result<()>;
fn create_drop_in(
&mut self,
runtime: bool,
unit_name: &str,
file_name: &str,
content: &str,
) -> zbus::fdo::Result<()>;
fn save_file(&mut self, file_name: &str, content: &str) -> zbus::fdo::Result<u64>;
fn revert_unit_files(&self, file_names: &[&str]) -> zbus::fdo::Result<Vec<DisEnAbleUnitFiles>>;
fn enable_unit_files_with_flags(
&mut self,
files: &[&str],
flags: u64,
) -> zbus::fdo::Result<DisEnAbleUnitFilesResponse>;
fn disable_unit_files_with_flags(
&mut self,
files: &[&str],
flags: u64,
) -> zbus::fdo::Result<DisEnAbleUnitFilesResponse>;
#[zbus(signal)]
fn hello(msg: String) -> zbus::fdo::Result<()>;
fn heart_beat(&self) -> zbus::fdo::Result<u64>;
}
fn ensure_proxy_up() {
}
fn get_proxy<'a>() -> Result<SysDManagerComLinkProxyBlocking<'a>, SystemdErrors> {
let destination = run_context().destination_address();
let connection = get_blocking_connection(UnitDBusLevel::System)?;
info!("BusName Destination {}", destination);
let proxy = SysDManagerComLinkProxyBlocking::builder(&connection)
.destination(destination)?
.build()?;
Ok(proxy)
}
pub(crate) async fn get_proxy_async<'a>() -> Result<SysDManagerComLinkProxy<'a>, SystemdErrors> {
let destination = super::RUN_CONTEXT
.get()
.expect("Supposed to be init")
.destination_address();
let connection = get_connection(UnitDBusLevel::System).await?;
info!("BusName Destination {}", destination);
let proxy = SysDManagerComLinkProxy::builder(&connection)
.destination(destination)?
.build()
.await?;
Ok(proxy)
}
pub fn start_unit(unit_name: &str, mode: &str) -> Result<String, SystemdErrors> {
let proxy = get_proxy()?;
let s = proxy.start_unit(unit_name, mode)?;
Ok(s.to_string())
}
pub fn stop_unit(unit_name: &str, mode: &str) -> Result<String, SystemdErrors> {
let proxy = get_proxy()?;
let s = proxy.stop_unit(unit_name, mode)?;
Ok(s.to_string())
}
pub fn restart_unit(unit_name: &str, mode: &str) -> Result<String, SystemdErrors> {
let proxy = get_proxy()?;
let s = proxy.restart_unit(unit_name, mode)?;
Ok(s.to_string())
}
pub fn clean_unit(unit_name: &str, what: &[&str]) -> Result<(), SystemdErrors> {
let proxy = get_proxy()?;
proxy.clean_unit(unit_name, what)?;
Ok(())
}
pub fn freeze_unit(unit_name: &str) -> Result<(), SystemdErrors> {
let proxy = get_proxy()?;
proxy.freeze_unit(unit_name)?;
Ok(())
}
pub fn thaw_unit(unit_name: &str) -> Result<(), SystemdErrors> {
let proxy: SysDManagerComLinkProxyBlocking<'_> = get_proxy()?;
proxy.thaw_unit(unit_name)?;
Ok(())
}
pub async fn reload() -> Result<(), SystemdErrors> {
let proxy = get_proxy_async().await?;
proxy.reload().await?;
Ok(())
}
fn extract_job_id(job: &str) -> Option<u32> {
job.rsplit_once('/')
.and_then(|(_, id)| id.parse::<u32>().ok())
}
pub async fn lazy_start_proxy_async() -> Result<(), SystemdErrors> {
crate::sysdbus::init_proxy_async2().await?;
Ok(())
}
pub(crate) async fn wait_hello(mut hello_stream: HelloStream) -> Result<(), SystemdErrors> {
if let Some(msg) = hello_stream.next().await {
let args = msg.args()?;
info!("Hello Proxy Args {:?}", args);
}
Ok(())
}
pub fn lazy_start_proxy_block() -> Result<(), SystemdErrors> {
crate::runtime().block_on(async move {
warn!("lazy 1");
lazy_start_proxy_async().await;
warn!("lazy 2");
});
Ok(())
}
static HEART_BEAT_HANDLE: OnceLock<JoinHandle<Result<(), SystemdErrors>>> = OnceLock::new();
pub(crate) fn start_heart_beat() {
if let Some(join_handle) = HEART_BEAT_HANDLE.get()
&& !join_handle.is_finished()
{
warn!("There is already an heart beat thread running");
} else {
info!("Starting Heart Beat");
let handle = tokio::spawn(send_heart_beat());
HEART_BEAT_HANDLE.set(handle);
}
}
async fn send_heart_beat() -> Result<(), SystemdErrors> {
let proxy = get_proxy_async().await?;
loop {
match proxy.heart_beat().await {
Ok(delay) => {
debug!("Heath Beat delay {delay} millis");
let delay = delay.clamp(MIN_HEART_BEAT_ELAPSE, MAX_HEART_BEAT_ELAPSE);
time::sleep(Duration::from_millis(delay)).await;
}
Err(err) => {
warn!("Send Heart Beat Error: {err:?}");
return Err(err.into());
}
}
}
}
#[macro_export]
macro_rules! proxy_call_blocking {
($func:ident, $($param:expr),+) => {
match $crate::to_proxy::$func($($param),+) {
Ok(ok) => Ok(ok),
Err(SystemdErrors::ZFdoServiceUnknowm(msg)) => {
warn!("Blocking ServiceUnkown: {:?} Func: {}", msg, stringify!($func));
$crate::to_proxy::lazy_start_proxy_block();
$crate::to_proxy::$func($($param),+)
},
Err(err) => Err(err)
}
}
}
#[macro_export]
macro_rules! proxy_call_async {
($func:ident) => {
proxy_call_async!($func,)
};
($func:ident, $($param:expr),*) => {
match $crate::to_proxy::$func($($param),*).await {
Ok(ok) => Ok(ok),
Err(SystemdErrors::ZFdoServiceUnknowm(msg)) => {
warn!("Async ServiceUnkown: {:?} Function: {}", msg, stringify!($func));
$crate::to_proxy::lazy_start_proxy_async().await;
$crate::to_proxy::$func($($param),*).await
},
Err(err) => Err(err)
}
}
}
pub(crate) async fn create_drop_in(
runtime: bool,
unit_name: &str,
file_path: &str,
content: &str,
) -> Result<(), SystemdErrors> {
let mut proxy = get_proxy_async().await?;
proxy
.create_drop_in(runtime, unit_name, file_path, content)
.await
.map_err(|e| e.into())
}
pub async fn save_file(file_path: &str, content: &str) -> Result<u64, SystemdErrors> {
let mut proxy = get_proxy_async().await?;
proxy
.save_file(file_path, content)
.await
.map_err(|e| e.into())
}
pub async fn revert_unit_files(
unit_names: &[&str],
) -> Result<Vec<DisEnAbleUnitFiles>, SystemdErrors> {
let proxy = get_proxy_async().await?;
proxy
.revert_unit_files(unit_names)
.await
.map_err(|e| e.into())
}
pub fn enable_unit_files_with_flags(
unit_files: &[&str],
flags: u64,
) -> Result<DisEnAbleUnitFilesResponse, SystemdErrors> {
let mut proxy: SysDManagerComLinkProxyBlocking<'_> = get_proxy()?;
proxy
.enable_unit_files_with_flags(unit_files, flags)
.map_err(|err| err.into())
}
pub fn disable_unit_files_with_flags(
unit_files: &[&str],
flags: u64,
) -> Result<DisEnAbleUnitFilesResponse, SystemdErrors> {
let mut proxy: SysDManagerComLinkProxyBlocking<'_> = get_proxy()?;
proxy
.disable_unit_files_with_flags(unit_files, flags)
.map_err(|err| err.into())
}