use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, mpsc};
use tokio::time::timeout;
use crate::client::client::{ChildProcessInfo, ServiceStatus, WhyBlocked, ZinitClient};
pub mod timeouts {
use std::time::Duration;
pub const QUICK_RPC: Duration = Duration::from_secs(5);
pub const STANDARD_OP: Duration = Duration::from_secs(10);
pub const LONG_OP: Duration = Duration::from_secs(30);
pub const VERY_LONG_OP: Duration = Duration::from_secs(60);
}
#[derive(Debug, Clone)]
pub enum Action {
FetchServices,
FetchLogs(String),
FetchChildren(String),
FetchWhy(String),
StartService(String),
StopService(String),
RestartService(String),
KillService(String, String), RemoveService(String),
HaltAndRemove(String),
Quit,
}
impl Action {
pub fn timeout(&self) -> Duration {
match self {
Action::FetchServices
| Action::FetchLogs(_)
| Action::FetchChildren(_)
| Action::FetchWhy(_) => timeouts::QUICK_RPC,
Action::StartService(_) | Action::RestartService(_) | Action::KillService(_, _) => {
timeouts::STANDARD_OP
}
Action::StopService(_) | Action::RemoveService(_) => timeouts::LONG_OP,
Action::HaltAndRemove(_) => timeouts::VERY_LONG_OP,
Action::Quit => Duration::from_secs(1),
}
}
pub fn display_name(&self) -> &'static str {
match self {
Action::FetchServices => "fetch",
Action::FetchLogs(_) => "logs",
Action::FetchChildren(_) => "children",
Action::FetchWhy(_) => "why",
Action::StartService(_) => "start",
Action::StopService(_) => "stop",
Action::RestartService(_) => "restart",
Action::KillService(_, _) => "kill",
Action::RemoveService(_) => "remove",
Action::HaltAndRemove(_) => "halt+remove",
Action::Quit => "quit",
}
}
}
#[derive(Clone, Debug)]
pub struct ServiceDisplayInfo {
pub name: String,
pub state: String,
pub pid: u32,
pub exit_code: Option<i32>,
pub error: Option<String>,
}
impl ServiceDisplayInfo {
pub fn from_status(status: ServiceStatus) -> Self {
Self {
name: status.name,
state: format!("{:?}", status.state).to_lowercase(),
pid: status.pid,
exit_code: status.exit_code,
error: status.error,
}
}
pub fn unknown(name: String) -> Self {
Self {
name,
state: "unknown".into(),
pid: 0,
exit_code: None,
error: None,
}
}
}
#[derive(Debug)]
pub enum ActionResult {
Services(Result<Vec<ServiceDisplayInfo>, String>),
Logs(String, Result<Vec<String>, String>),
Children(String, Result<Vec<ChildProcessInfo>, String>),
Why(String, Result<WhyBlocked, String>),
OperationComplete {
action: String,
service: String,
result: Result<(), String>,
},
Disconnected(String),
#[allow(dead_code)]
Reconnected,
}
#[derive(Debug, Clone)]
pub struct PendingOperation {
pub action: String,
pub service: String,
pub started: Instant,
}
pub struct ActionExecutor {
socket_path: PathBuf,
result_tx: mpsc::UnboundedSender<ActionResult>,
client: Arc<Mutex<Option<ZinitClient>>>,
}
impl ActionExecutor {
pub fn new(socket_path: PathBuf, result_tx: mpsc::UnboundedSender<ActionResult>) -> Self {
Self {
socket_path,
result_tx,
client: Arc::new(Mutex::new(None)),
}
}
pub fn spawn(&self, action: Action) {
let client = Arc::clone(&self.client);
let result_tx = self.result_tx.clone();
let socket_path = self.socket_path.clone();
let action_timeout = action.timeout();
tokio::spawn(async move {
let result = timeout(
action_timeout,
Self::execute_action(action.clone(), client, socket_path),
)
.await;
let result = match result {
Ok(r) => r,
Err(_) => {
Self::timeout_result(&action, action_timeout)
}
};
let _ = result_tx.send(result);
});
}
fn timeout_result(action: &Action, duration: Duration) -> ActionResult {
let timeout_msg = format!("Timeout after {}s", duration.as_secs());
match action {
Action::FetchServices => ActionResult::Services(Err(timeout_msg)),
Action::FetchLogs(name) => ActionResult::Logs(name.clone(), Err(timeout_msg)),
Action::FetchChildren(name) => ActionResult::Children(name.clone(), Err(timeout_msg)),
Action::FetchWhy(name) => ActionResult::Why(name.clone(), Err(timeout_msg)),
Action::StartService(name)
| Action::StopService(name)
| Action::RestartService(name)
| Action::RemoveService(name)
| Action::HaltAndRemove(name) => ActionResult::OperationComplete {
action: action.display_name().into(),
service: name.clone(),
result: Err(timeout_msg),
},
Action::KillService(name, _) => ActionResult::OperationComplete {
action: "kill".into(),
service: name.clone(),
result: Err(timeout_msg),
},
Action::Quit => ActionResult::OperationComplete {
action: "quit".into(),
service: String::new(),
result: Err(timeout_msg),
},
}
}
async fn execute_action(
action: Action,
client: Arc<Mutex<Option<ZinitClient>>>,
socket_path: PathBuf,
) -> ActionResult {
let mut guard = client.lock().await;
if guard.is_none() {
let c = ZinitClient::unix(&socket_path);
*guard = Some(c);
}
let client = guard.as_mut().unwrap();
match action {
Action::FetchServices => Self::fetch_services(client).await,
Action::FetchLogs(name) => Self::fetch_logs(client, &name).await,
Action::FetchChildren(name) => Self::fetch_children(client, &name).await,
Action::FetchWhy(name) => Self::fetch_why(client, &name).await,
Action::StartService(name) => Self::start_service(client, &name).await,
Action::StopService(name) => Self::stop_service(client, &name).await,
Action::RestartService(name) => Self::restart_service(client, &name).await,
Action::KillService(name, signal) => Self::kill_service(client, &name, &signal).await,
Action::RemoveService(name) => Self::remove_service(client, &name).await,
Action::HaltAndRemove(name) => Self::halt_and_remove(client, &name).await,
Action::Quit => ActionResult::OperationComplete {
action: "quit".into(),
service: String::new(),
result: Ok(()),
},
}
}
async fn fetch_services(client: &mut ZinitClient) -> ActionResult {
match client.list().await {
Ok(names) => {
let mut services = Vec::with_capacity(names.len());
for name in names {
match client.status(&name).await {
Ok(status) => services.push(ServiceDisplayInfo::from_status(status)),
Err(_) => services.push(ServiceDisplayInfo::unknown(name)),
}
}
services.sort_by(|a, b| a.name.cmp(&b.name));
ActionResult::Services(Ok(services))
}
Err(e) => ActionResult::Services(Err(e.to_string())),
}
}
async fn fetch_logs(client: &mut ZinitClient, name: &str) -> ActionResult {
match client.logs(Some(name), Some(200)).await {
Ok(logs) => ActionResult::Logs(name.to_string(), Ok(logs)),
Err(e) => ActionResult::Logs(name.to_string(), Err(e.to_string())),
}
}
async fn fetch_children(client: &mut ZinitClient, name: &str) -> ActionResult {
match client.children(name).await {
Ok(response) => ActionResult::Children(name.to_string(), Ok(response.children)),
Err(e) => ActionResult::Children(name.to_string(), Err(e.to_string())),
}
}
async fn fetch_why(client: &mut ZinitClient, name: &str) -> ActionResult {
match client.why(name).await {
Ok(why) => ActionResult::Why(name.to_string(), Ok(why)),
Err(e) => ActionResult::Why(name.to_string(), Err(e.to_string())),
}
}
async fn start_service(client: &mut ZinitClient, name: &str) -> ActionResult {
let result = client.start(name).await.map_err(|e| e.to_string());
ActionResult::OperationComplete {
action: "start".into(),
service: name.to_string(),
result,
}
}
async fn stop_service(client: &mut ZinitClient, name: &str) -> ActionResult {
let result = client.stop(name).await.map_err(|e| e.to_string());
ActionResult::OperationComplete {
action: "stop".into(),
service: name.to_string(),
result,
}
}
async fn restart_service(client: &mut ZinitClient, name: &str) -> ActionResult {
let result = client.restart(name).await.map_err(|e| e.to_string());
ActionResult::OperationComplete {
action: "restart".into(),
service: name.to_string(),
result,
}
}
async fn kill_service(client: &mut ZinitClient, name: &str, signal: &str) -> ActionResult {
let result = client
.kill(name, Some(signal))
.await
.map_err(|e| e.to_string());
ActionResult::OperationComplete {
action: "kill".into(),
service: name.to_string(),
result,
}
}
async fn remove_service(client: &mut ZinitClient, name: &str) -> ActionResult {
let result = client.service_delete(name).await.map_err(|e| e.to_string());
ActionResult::OperationComplete {
action: "remove".into(),
service: name.to_string(),
result,
}
}
async fn halt_and_remove(client: &mut ZinitClient, name: &str) -> ActionResult {
let _ = client.kill(name, Some("SIGKILL")).await;
tokio::time::sleep(Duration::from_millis(100)).await;
let result = client.service_delete(name).await.map_err(|e| e.to_string());
ActionResult::OperationComplete {
action: "halt+remove".into(),
service: name.to_string(),
result,
}
}
}