use crate::services::Initial;
use crate::tools::ErrLogger;
use crate::{EResult, Error};
use elbus::client::AsyncClient;
use elbus::rpc::{Rpc, RpcClient, RpcError, RpcEvent, RpcHandlers, RpcResult};
use elbus::{Frame, FrameKind, QoS};
use log::{debug, error, info};
use std::collections::HashMap;
use std::process::Stdio;
use std::sync::atomic;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::io::{BufReader, BufWriter};
use tokio::process::Command;
use tokio::sync::{Mutex, RwLock};
use tokio::task::JoinHandle;
use tokio::time::sleep;
const RESTART_PERIOD: Duration = Duration::from_secs(1);
struct ServiceRuntimeData {
pid: u32,
tasks: Vec<JoinHandle<()>>,
shutdown_timeout: Duration,
status: Arc<atomic::AtomicU8>,
}
impl Drop for ServiceRuntimeData {
fn drop(&mut self) {
for fut in &self.tasks {
fut.abort();
}
}
}
struct Service {
name: String,
data: Arc<Mutex<Option<ServiceRuntimeData>>>,
active: Arc<atomic::AtomicBool>,
runner_fut: Option<JoinHandle<()>>,
}
impl Service {
fn new(name: &str) -> Self {
Self {
name: name.to_owned(),
data: <_>::default(),
active: <_>::default(),
runner_fut: None,
}
}
async fn launch(
name: &str,
command: &str,
args: &[&str],
initial: &Initial,
rpc: Arc<RwLock<Option<RpcClient>>>,
service_data: &Mutex<Option<ServiceRuntimeData>>,
) -> EResult<()> {
let mut child = Command::new(command)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.kill_on_drop(false)
.args(args)
.spawn()?;
let stdin = child
.stdin
.take()
.ok_or_else(|| Error::io("unable to get stdin"))?;
let stdout = child
.stdout
.take()
.ok_or_else(|| Error::io("unable to get stdout"))?;
let stderr = child
.stderr
.take()
.ok_or_else(|| Error::io("unable to get stderr"))?;
let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();
let pid = child
.id()
.ok_or_else(|| Error::io("unable to get process id"))?;
{
let mut stdin_writer = BufWriter::new(stdin);
stdin_writer
.write_all(&rmp_serde::to_vec_named(initial)?)
.await?;
stdin_writer.flush().await?;
}
let startup_timeout = initial.startup_timeout();
let shutdown_timeout = initial.shutdown_timeout();
let timeout = initial.timeout();
let svc_name = name.to_owned();
{
let mut r_data_c = service_data.lock().await;
let pinger_fut = tokio::spawn(async move {
async fn ping(
name: &str,
rpc: &RwLock<Option<RpcClient>>,
timeout: Duration,
) -> EResult<()> {
tokio::time::timeout(
timeout,
rpc.read().await.as_ref().unwrap().call(
name,
"test",
elbus::empty_payload!(),
QoS::Processed,
),
)
.await??;
Ok(())
}
sleep(startup_timeout).await;
loop {
if ping(&svc_name, &rpc, timeout).await.log_err().is_err() {
bmart::process::kill_pstree(pid, Some(shutdown_timeout), true).await;
break;
}
sleep(timeout).await;
}
});
let svc_name = name.to_owned();
let stdout_fut = tokio::spawn(async move {
while let Ok(Some(line)) = stdout_reader.next_line().await {
info!("{} {}", svc_name, line);
}
});
let svc_name = name.to_owned();
let stderr_fut = tokio::spawn(async move {
while let Ok(Some(line)) = stderr_reader.next_line().await {
error!("{} {}", svc_name, line);
}
});
let status_beacon: Arc<atomic::AtomicU8> = <_>::default();
let b = status_beacon.clone();
let svc_name = name.to_owned();
let ready_fut = tokio::spawn(async move {
sleep(startup_timeout).await;
if b.load(atomic::Ordering::SeqCst) == 0 {
error!("service {} is not ready, terminating", svc_name);
bmart::process::kill_pstree(pid, None, true).await;
}
});
let r_data: ServiceRuntimeData = ServiceRuntimeData {
pid,
tasks: vec![pinger_fut, stdout_fut, stderr_fut, ready_fut],
shutdown_timeout,
status: status_beacon,
};
r_data_c.replace(r_data);
}
let result = child.wait().await;
service_data.lock().await.take();
result?;
Ok(())
}
async fn runner(
name: &str,
initial: &Initial,
rpc: Arc<RwLock<Option<RpcClient>>>,
active: Arc<atomic::AtomicBool>,
service_data: Arc<Mutex<Option<ServiceRuntimeData>>>,
) -> EResult<()> {
while active.load(atomic::Ordering::SeqCst) {
let mut sp = initial.command().split(' ');
let command = sp
.next()
.ok_or_else(|| Error::invalid_data("command not specified"))?;
let args: Vec<&str> = sp.collect();
let _r = Self::launch(name, command, &args, initial, rpc.clone(), &service_data)
.await
.map_err(|e| Error::io(format!("unable to launch {} service: {}", name, e)))
.log_err();
sleep(RESTART_PERIOD).await;
}
Ok(())
}
async fn start(&mut self, initial: Initial, rpc: Arc<RwLock<Option<RpcClient>>>) {
self.active.store(true, atomic::Ordering::SeqCst);
let name = self.name.clone();
let active = self.active.clone();
let data = self.data.clone();
let fut = tokio::spawn(async move {
let _r = Self::runner(&name, &initial, rpc, active, data)
.await
.log_err();
});
self.runner_fut.replace(fut);
}
async fn stop(&self) {
self.active.store(false, atomic::Ordering::SeqCst);
let data = self.data.lock().await.take();
if let Some(ref fut) = self.runner_fut {
fut.abort();
}
if let Some(service_data) = data {
bmart::process::kill_pstree(
service_data.pid,
Some(service_data.shutdown_timeout),
true,
)
.await;
}
}
}
#[derive(Default)]
struct Handlers {
services: Mutex<HashMap<String, Service>>,
rpc: Arc<RwLock<Option<RpcClient>>>,
}
#[async_trait::async_trait]
impl RpcHandlers for Handlers {
async fn handle_call(&self, event: RpcEvent) -> RpcResult {
macro_rules! stop_svc {
($name: expr, $services: expr) => {{
let svc = $services.remove($name);
if let Some(service) = svc {
service.stop().await;
}
}};
}
match event.parse_method()? {
"list" => {
use crate::services::ServiceStatus;
let mut result: HashMap<&str, ServiceStatus> = HashMap::new();
let services = self.services.lock().await;
for (s, v) in services.iter() {
result.insert(
s,
v.data.lock().await.as_ref().map_or_else(
|| ServiceStatus {
pid: None,
status: crate::services::Status::Starting,
},
|d| ServiceStatus {
pid: Some(d.pid),
status: d.status.load(atomic::Ordering::SeqCst).into(),
},
),
);
}
Ok(Some(rmp_serde::to_vec_named(&result)?))
}
"start" => {
if self.rpc.read().await.is_none() {
return Err(Error::failed("rpc not initialized yet").into());
}
let mut services = self.services.lock().await;
let p: crate::services::PayloadStartStop =
rmp_serde::from_read_ref(event.payload())?;
debug!("starting service {}", p.name);
stop_svc!(&p.name, services);
let mut service = Service::new(&p.name);
tokio::fs::create_dir_all(&p.initial.data_path()).await?;
service.start(p.initial, self.rpc.clone()).await;
services.insert(p.name, service);
Ok(None)
}
"stop" => {
let p: crate::services::PayloadStartStop =
rmp_serde::from_read_ref(event.payload())?;
debug!("stopping service {}", p.name);
stop_svc!(&p.name, self.services.lock().await);
Ok(None)
}
"stop_and_purge" => {
let p: crate::services::PayloadStartStop =
rmp_serde::from_read_ref(event.payload())?;
debug!("stopping service {}", p.name);
stop_svc!(&p.name, self.services.lock().await);
debug!("purging service data {}", p.name);
tokio::fs::remove_dir_all(&p.initial.data_path()).await?;
Ok(None)
}
_ => Err(RpcError::method(None)),
}
}
async fn handle_notification(&self, _event: RpcEvent) {}
async fn handle_frame(&self, frame: Frame) {
if frame.kind() == FrameKind::Broadcast && frame.sender().starts_with("eva.svc.") {
if let Ok(status) = rmp_serde::from_slice::<crate::services::ServiceStatusBroadcastEvent>(
frame.payload(),
) {
if let Some(svc) = self.services.lock().await.get(frame.sender()) {
if let Some(data) = svc.data.lock().await.as_ref() {
data.status
.store(status.status as u8, atomic::Ordering::SeqCst);
}
}
}
}
}
}
pub async fn init<C>(client: C)
where
C: AsyncClient + 'static,
{
let handlers = Handlers::default();
let handlers_rpc = handlers.rpc.clone();
let rpc = RpcClient::new(client, handlers);
handlers_rpc.write().await.replace(rpc);
}