use std::{
collections::HashMap,
fmt::Debug,
io,
path::{Path, PathBuf},
time::Duration,
};
use tokio::{
sync::{
broadcast,
mpsc::{self},
oneshot::{self},
},
task::JoinHandle,
};
use tokio_stream::{
wrappers::{errors::BroadcastStreamRecvError, BroadcastStream},
Stream, StreamExt,
};
use uuid::Uuid;
use crate::working_dir::WorkingDir;
use super::{
log_reader::{LogReaderError, LogsQuery, LogsQueryType},
message::Message,
scoped_process_handle::ScopedProcessHandle,
spawner::ProcessSpawner,
ProcessHandle, ProcessInfo, Runnable,
};
use super::{
model::{Process, ProcessId},
spawner::SpawnerError,
};
#[derive(Debug)]
enum ProcessManagerMessage {
SpawnProcess {
cmd: Box<dyn Runnable>,
responder: oneshot::Sender<Result<ProcessId, SpawnProcessError>>,
},
SubscribeMessageStream {
id: ProcessId,
responder: oneshot::Sender<Result<broadcast::Receiver<Vec<u8>>, ReadMessageError>>,
},
WriteMessage {
id: ProcessId,
data: Vec<u8>,
responder: oneshot::Sender<Result<(), WriteMessageError>>,
},
KillProcess {
id: ProcessId,
responder: oneshot::Sender<Result<(), KillProcessError>>,
},
GetLogs {
id: ProcessId,
logs_query_type: LogsQueryType,
query: LogsQuery,
responder: oneshot::Sender<Result<Vec<String>, GetLogsError>>,
},
GetProcessInfo {
id: ProcessId,
responder: oneshot::Sender<Result<ProcessInfo, GetProcessInfoError>>,
},
Abort {
responder: oneshot::Sender<()>,
},
}
pub struct ProcessManager {
working_dir: WorkingDir,
process_spawner: ProcessSpawner,
processes: HashMap<ProcessId, Process>,
receiver: mpsc::Receiver<ProcessManagerMessage>,
is_aborted: bool,
}
impl ProcessManager {
pub fn spawn(working_directory: PathBuf) -> Result<ProcessManagerHandle, io::Error> {
Self::validate_working_dir(&working_directory)?;
let (sender, receiver) = mpsc::channel(32);
let working_dir = WorkingDir::new(working_directory);
let mut manager = Self {
working_dir: working_dir.clone(),
process_spawner: ProcessSpawner::new(working_dir),
receiver,
processes: HashMap::new(),
is_aborted: false,
};
tokio::spawn(async move { manager.run().await });
Ok(ProcessManagerHandle::new(sender))
}
fn validate_working_dir(working_directory: &Path) -> Result<(), io::Error> {
let file_path = working_directory.join(Uuid::new_v4().to_string());
std::fs::write(&file_path, "testing working dir")?;
std::fs::remove_file(file_path)
}
async fn run(&mut self) {
loop {
if self.is_aborted {
break;
} else if let Some(msg) = self.receiver.recv().await {
self.handle_message(msg).await;
} else {
self.abort().await;
}
}
}
async fn handle_message(&mut self, msg: ProcessManagerMessage) {
match msg {
ProcessManagerMessage::SpawnProcess { cmd, responder } => {
let result = self.spawn_process(cmd).await;
let _ = responder.send(result);
}
ProcessManagerMessage::KillProcess { id, responder } => {
let result = self.kill_process(id).await;
let _ = responder.send(result);
}
ProcessManagerMessage::SubscribeMessageStream { id, responder } => {
let result = self.subscribe_to_message_stream(id).await;
let _ = responder.send(result);
}
ProcessManagerMessage::WriteMessage {
id,
data,
responder,
} => {
let result = self.send_message(id, data).await;
let _ = responder.send(result);
}
ProcessManagerMessage::GetLogs {
id,
logs_query_type,
query,
responder,
} => {
let result = self.get_logs(id, logs_query_type, query).await;
let _ = responder.send(result);
}
ProcessManagerMessage::GetProcessInfo { id, responder } => {
let result = self.get_process_info(id);
let _ = responder.send(result);
}
ProcessManagerMessage::Abort { responder } => {
let result = self.abort().await;
let _ = responder.send(result);
}
}
}
async fn spawn_process(
&mut self,
runnable: Box<dyn Runnable>,
) -> Result<ProcessId, SpawnProcessError> {
let id = ProcessId::random();
let process = self.process_spawner.spawn_runnable(&id, runnable)?;
self.processes.insert(id, process);
Ok(id)
}
async fn kill_process(&mut self, id: ProcessId) -> Result<(), KillProcessError> {
let process = self
.processes
.get_mut(&id)
.ok_or(KillProcessError::ProcessNotFound(id))?;
if let Some(reader) = process.message_reader.take() {
reader.abort().await;
}
if let Some(writer) = process.message_writer.take() {
writer.abort().await;
}
if let Some(log_reader) = process.log_reader.take() {
log_reader.abort().await;
}
self.processes.remove(&id); let process_dir = self.working_dir.process_dir(&id);
let _ = tokio::fs::remove_dir_all(process_dir).await;
Ok(())
}
async fn subscribe_to_message_stream(
&mut self,
id: ProcessId,
) -> Result<broadcast::Receiver<Vec<u8>>, ReadMessageError> {
let reader = self
.processes
.get(&id)
.ok_or(ReadMessageError::ProcessNotFound(id))?
.message_reader
.as_ref()
.ok_or(ReadMessageError::MessageReaderNotFound(id))?;
let receiver = reader
.subscribe()
.await
.map_err(|_| ReadMessageError::MessageReaderKilled)?;
Ok(receiver)
}
async fn send_message(
&mut self,
id: ProcessId,
data: Vec<u8>,
) -> Result<(), WriteMessageError> {
self.processes
.get(&id)
.ok_or(WriteMessageError::ProcessNotFound(id))?
.message_writer
.as_ref()
.ok_or(WriteMessageError::MessageWriterNotFound(id))?
.write(data)
.await
.map_err(Into::into)
}
async fn get_logs(
&mut self,
id: ProcessId,
logs_query_type: LogsQueryType,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError> {
self.processes
.get(&id)
.ok_or(GetLogsError::ProcessNotFound(id))?
.log_reader
.as_ref()
.ok_or(GetLogsError::LogReaderNotFound(id))?
.read_logs(logs_query_type, query)
.await
.map_err(Into::into)
}
fn get_process_info(&mut self, id: ProcessId) -> Result<ProcessInfo, GetProcessInfoError> {
let process = self
.processes
.get_mut(&id)
.ok_or(GetProcessInfoError::ProcessNotFound(id))?;
let pid = process.child.id();
let exit_status = process.child.try_wait()?;
Ok(ProcessInfo::new(pid, exit_status))
}
async fn abort(&mut self) {
self.is_aborted = true;
self.receiver.close();
let ids: Vec<ProcessId> = self.processes.keys().cloned().collect();
for id in ids {
let _ = self.kill_process(id).await;
}
}
}
impl Drop for ProcessManager {
fn drop(&mut self) {
self.processes.keys().for_each(|id| {
let process_dir = self.working_dir.process_dir(id);
let _ = std::fs::remove_dir_all(process_dir);
});
}
}
#[derive(Clone, Debug)]
pub struct ProcessManagerHandle {
sender: mpsc::Sender<ProcessManagerMessage>,
}
impl ProcessManagerHandle {
fn new(sender: mpsc::Sender<ProcessManagerMessage>) -> Self {
Self { sender }
}
pub async fn spawn(&self, runnable: impl Runnable) -> Result<ProcessId, SpawnProcessError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::SpawnProcess {
cmd: Box::new(runnable),
responder,
};
let _ = self.sender.send(msg).await;
let process_id = receiver.await??;
Ok(process_id)
}
pub async fn spawn_with_handle(
&self,
runnable: impl Runnable,
) -> Result<ProcessHandle, SpawnProcessError> {
let id = self.spawn(runnable).await?;
Ok(ProcessHandle::new(id, self.clone()))
}
pub async fn spawn_with_scoped_handle(
&self,
runnable: impl Runnable,
) -> Result<ScopedProcessHandle, SpawnProcessError> {
let id = self.spawn(runnable).await?;
Ok(ScopedProcessHandle::new(ProcessHandle::new(
id,
self.clone(),
)))
}
pub async fn send_message<M>(&self, id: ProcessId, message: M) -> Result<(), WriteMessageError>
where
M: Into<Message>,
{
if let Some(data) = Self::into_bytes_with_eol_char(message.into().bytes) {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::WriteMessage {
id,
data,
responder,
};
let _ = self.sender.send(msg).await;
receiver.await??;
}
Ok(())
}
fn into_bytes_with_eol_char(mut bytes: Vec<u8>) -> Option<Vec<u8>> {
if let Some(last_char) = bytes.last() {
if *last_char != b'\n' {
bytes.push(b'\n');
}
Some(bytes)
} else {
None
}
}
pub async fn subscribe_message_stream(
&self,
id: ProcessId,
) -> Result<impl Stream<Item = Result<Message, ReceiveMessageError>>, ReadMessageError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::SubscribeMessageStream { id, responder };
let _ = self.sender.send(msg).await;
let message_receiver = receiver.await??;
let stream = BroadcastStream::new(message_receiver)
.map(|v| v.map(Message::from_bytes).map_err(Into::into));
Ok(stream)
}
pub async fn get_logs_stdout(
&self,
id: ProcessId,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError> {
self.get_logs(id, LogsQueryType::Stdout, query).await
}
pub async fn get_logs_stderr(
&self,
id: ProcessId,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError> {
self.get_logs(id, LogsQueryType::Stderr, query).await
}
async fn get_logs(
&self,
id: ProcessId,
logs_query_type: LogsQueryType,
query: LogsQuery,
) -> Result<Vec<String>, GetLogsError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::GetLogs {
id,
logs_query_type,
query,
responder,
};
let _ = self.sender.send(msg).await;
let logs = receiver.await??;
Ok(logs)
}
pub async fn get_process_info(
&self,
id: ProcessId,
) -> Result<ProcessInfo, GetProcessInfoError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::GetProcessInfo { id, responder };
let _ = self.sender.send(msg).await;
let data = receiver.await??;
Ok(data)
}
pub fn wait(
&self,
id: ProcessId,
poll_interval: Duration,
) -> JoinHandle<Result<ProcessInfo, GetProcessInfoError>> {
let handle = self.clone();
tokio::spawn(async move {
loop {
let process_info = handle.get_process_info(id).await?;
if process_info.is_running() {
tokio::time::sleep(poll_interval).await;
} else {
return Ok(process_info);
}
}
})
}
pub async fn kill(&self, id: ProcessId) -> Result<(), KillProcessError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::KillProcess { id, responder };
let _ = self.sender.send(msg).await;
receiver.await??;
Ok(())
}
pub async fn abort(&self) -> Result<(), AbortError> {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::Abort { responder };
self.sender
.send(msg)
.await
.map_err(|_| AbortError::ManagerAlreadyAborted)?;
receiver
.await
.map_err(|_| AbortError::ManagerAlreadyAborted)?;
Ok(())
}
pub(crate) fn try_kill(&self, id: ProcessId) -> bool {
let (responder, _) = oneshot::channel();
let msg = ProcessManagerMessage::KillProcess { id, responder };
self.sender.try_send(msg).is_ok()
}
pub(crate) async fn kill_with_timeout(&self, id: ProcessId, duration: Duration) {
let (responder, receiver) = oneshot::channel();
let msg = ProcessManagerMessage::KillProcess { id, responder };
let _ = self.sender.send_timeout(msg, duration).await;
let _ = receiver.await;
}
}
#[derive(thiserror::Error, Debug)]
pub enum ReceiveMessageError {
#[error("Lost {0} number of messages due to buffer capacity overflow")]
LostMessages(u64),
}
#[derive(thiserror::Error, Debug)]
pub enum ReceiveDeserializedMessageError {
#[error("Lost {0} number of messages due to buffer capacity overflow")]
LostMessages(u64),
#[error("{0}")]
CannotDeserializeMessage(String),
}
impl From<BroadcastStreamRecvError> for ReceiveMessageError {
fn from(err: BroadcastStreamRecvError) -> Self {
match err {
BroadcastStreamRecvError::Lagged(size) => ReceiveMessageError::LostMessages(size),
}
}
}
impl From<ReceiveMessageError> for ReceiveDeserializedMessageError {
fn from(value: ReceiveMessageError) -> Self {
match value {
ReceiveMessageError::LostMessages(number) => {
ReceiveDeserializedMessageError::LostMessages(number)
}
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum SpawnProcessError {
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
#[error("Cannot create named pipe at path: {0}. Error code: {1}")]
CannotCreateNamedPipe(PathBuf, String),
#[error("Cannot spawn process: {0}")]
CannotSpawnProcess(#[from] io::Error),
#[error("Cannot create process directory: {0}")]
CannotCreateProcessWorkingDir(io::Error),
#[error("Bootstrap process failed: {0}")]
BootstrapProcessFailed(String),
}
impl From<SpawnerError> for SpawnProcessError {
fn from(value: SpawnerError) -> Self {
match value {
SpawnerError::CannotCreateNamedPipe(pipe_path, err) => {
SpawnProcessError::CannotCreateNamedPipe(pipe_path, err.to_string())
}
SpawnerError::CannotSpawnProcess(err) => SpawnProcessError::CannotSpawnProcess(err),
SpawnerError::CannotCreateProcessWorkingDir(err) => {
SpawnProcessError::CannotCreateProcessWorkingDir(err)
}
SpawnerError::BootstrapProcessFailed(err) => Self::BootstrapProcessFailed(err),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum ReadMessageError {
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
#[error("Message reader for process with id: {0} was not found")]
MessageReaderNotFound(ProcessId),
#[error("Message reader process has been killed")]
MessageReaderKilled,
}
#[derive(thiserror::Error, Debug)]
pub enum WriteMessageError {
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
#[error("{0}")]
CannotSerializeMessage(String),
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
#[error("Error occurred when sending message to process: {0}")]
IoError(#[from] std::io::Error),
#[error("Message writer for process with id: {0} was not found")]
MessageWriterNotFound(ProcessId),
}
#[derive(thiserror::Error, Debug)]
pub enum KillProcessError {
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
}
#[derive(thiserror::Error, Debug)]
pub enum GetLogsError {
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
#[error("Logging type: {0} was not configured for process")]
LoggingTypeWasNotConfigured(String),
#[error("Log read for process with id: {0} was not found")]
LogReaderNotFound(ProcessId),
#[error(transparent)]
UnExpectedIoError(#[from] io::Error),
}
impl From<LogReaderError> for GetLogsError {
fn from(err: LogReaderError) -> Self {
match err {
LogReaderError::LogTypeWasNotConfigured(log_type) => {
Self::LoggingTypeWasNotConfigured(log_type.to_string())
}
LogReaderError::UnExpectedIoError(err) => Self::UnExpectedIoError(err),
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum GetProcessInfoError {
#[error("Process with id: {0} was not found")]
ProcessNotFound(ProcessId),
#[error("Cannot communicate with spawned process manager")]
ManagerCommunicationError(#[from] oneshot::error::RecvError),
#[error(transparent)]
UnExpectedIoError(#[from] io::Error),
}
#[derive(thiserror::Error, Debug)]
pub enum AbortError {
#[error("Cannot communicate with spawned process manager")]
ManagerAlreadyAborted,
}