gadget_sdk/executor/process/manager.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181
use super::error::Error;
use crate::executor::process::types::{GadgetProcess, ProcessOutput, Status};
use crate::executor::process::utils::*;
use crate::executor::OS_COMMAND;
use crate::{craft_child_process, run_command};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use sysinfo::System;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::sync::broadcast;
/// Manager for gadget-executor process. The processes are recorded to be controlled by their Service name.
/// This Manager can be reconstructed from a file to recover a gadget-executor.
#[derive(Serialize, Deserialize, Debug)]
pub struct GadgetProcessManager {
/// Hashmap that contains all the children spawned by this Manager. Keys are the names of each Service.
pub children: HashMap<String, GadgetProcess>,
}
impl GadgetProcessManager {
pub fn new() -> GadgetProcessManager {
GadgetProcessManager {
children: HashMap::new(),
}
}
/// Load the state of previously running processes to recover gadget-executor
#[allow(dead_code)]
pub(crate) async fn new_from_saved(
file: &str,
) -> Result<GadgetProcessManager, crate::error::Error> {
let file = std::fs::File::open(file)?;
let mut new_manager: GadgetProcessManager = serde_json::from_reader(file)?;
// Restarts processes that were previously running
new_manager.restart_dead().await?;
Ok(new_manager)
}
/// Store the state of the current processes
#[allow(dead_code)]
pub(crate) async fn save_state(&self) -> Result<String, crate::error::Error> {
let serialized_data = serde_json::to_string(self)?;
let mut file = File::create("./savestate.json").await?;
file.write_all(serialized_data.clone().as_bytes()).await?;
Ok(serialized_data)
}
/// Runs the given command and stores it using the identifier as the key. Returns the identifier used
#[allow(unused_results)]
pub async fn run(&mut self, identifier: String, command: &str) -> Result<String, Error> {
let gadget_process = run_command!(command)?;
self.children.insert(identifier.clone(), gadget_process);
Ok(identifier)
}
/// Runs the given command and returns a [stream](broadcast::Receiver) of its output
#[allow(unused_results)]
pub async fn start_process_and_get_output(
&mut self,
identifier: String,
command: &str,
) -> Result<broadcast::Receiver<String>, Error> {
self.run(identifier.clone(), command).await?;
let process = self
.children
.get_mut(&identifier)
.ok_or(Error::ServiceNotFound(identifier))?;
process.resubscribe()
}
/// Focuses on the given service until its stream is exhausted, meaning that the process ran to completion.
pub async fn focus_service_to_completion(&mut self, service: String) -> Result<String, Error> {
let process = self
.children
.get_mut(&service)
.ok_or(Error::ServiceNotFound(service))?;
let mut output_stream = String::new();
loop {
match process.read_until_default_timeout().await {
ProcessOutput::Output(output) => {
output_stream.push_str(&format!("{output:?}\n"));
continue;
}
ProcessOutput::Exhausted(output) => {
output_stream.push_str(&format!("{output:?}\n"));
break;
}
ProcessOutput::Waiting => {
continue;
}
}
}
Ok(output_stream)
}
/// Focuses on the given service until its output includes the substring provided. Returns a
/// ProcessOutput. ProcessOutput::Output means that the substring was received,
/// ProcessOutput::Exhausted means that the substring was not found and the stream was
/// exhausted, ProcessOutput::Waiting means that the substring was never found and the stream
/// is not exhausted (an error likely occurred).
pub(crate) async fn focus_service_until_output_contains(
&mut self,
service: String,
specified_output: String,
) -> Result<ProcessOutput, Error> {
let process = self
.children
.get_mut(&service)
.ok_or(Error::ServiceNotFound(service))?;
Ok(process.read_until_receiving_string(specified_output).await)
}
/// Removes processes that are no longer running from the manager. Returns a Vector of the names of processes removed
#[allow(dead_code)]
pub(crate) async fn remove_dead(&mut self) -> Result<Vec<String>, Error> {
let dead_processes = Vec::new();
let mut to_remove = Vec::new();
let s = System::new_all();
// Find dead processes and gather them for return & removal
for (key, value) in self.children.iter() {
let current_pid = value.pid;
if let Some(process) = s.process(current_pid) {
if process.name() == value.process_name {
// Still running
continue;
} else {
// No longer running, some unknown process is now utilizing this PID
to_remove.push(key.clone());
}
}
}
self.children.retain(|k, _| !to_remove.contains(k));
// TODO: If dead children are `supposed` to be running, we should start them up again instead of just removing them
Ok(dead_processes)
}
/// Finds all dead processes that still exist in map and starts them again. This function
/// is used to restart all processes after loading a Manager from a file.
#[allow(unused_results)]
pub(crate) async fn restart_dead(&mut self) -> Result<(), Error> {
let mut restarted_processes = Vec::new();
let mut to_remove = Vec::new();
// Find dead processes and restart them
for (key, value) in self.children.iter_mut() {
match value.status()? {
Status::Active | Status::Sleeping => {
// TODO: Metrics + Logs for these living processes
// Check if this process is still running what is expected
// If it is still correctly running, we just move along
continue;
}
Status::Inactive | Status::Dead => {
// Dead, should be restarted
}
Status::Unknown(code) => {
println!("LOG : {} yielded {}", key.clone(), code);
}
}
restarted_processes.push((key.clone(), value.restart_process().await?));
to_remove.push(key.clone());
}
self.children.retain(|k, _| !to_remove.contains(k));
for (service, restarted) in restarted_processes {
self.children.insert(service.clone(), restarted);
}
Ok(())
}
}
impl Default for GadgetProcessManager {
fn default() -> Self {
Self::new()
}
}