1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#![deny(missing_docs)]
//! Process supervisor with inter-process communication support using tokio.
//!
//! Currently only supports Unix, later we plan to add support for Windows using named pipes.
//!
//! ## Supervisor
//!
//! Supervisor manages child processes sending socket information using the environment and then switching
//! to Unix domain sockets for inter-process communication. Daemon processes are restarted if
//! they die without being shutdown by the supervisor.
//!
//! ```ignore
//! use psup::{Result, Task, SupervisorBuilder};
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//!    let worker_cmd = "worker-process";
//!    let supervisor = SupervisorBuilder::new(Box::new(|stream| {
//!            let (reader, mut writer) = stream.into_split();
//!            // Handle worker connections here
//!        }))
//!        .path(std::env::temp_dir().join("supervisor.sock"))
//!        .add_worker(Task::new(worker_cmd).daemon(true))
//!        .add_worker(Task::new(worker_cmd).daemon(true))
//!        .build();
//!    supervisor.run().await?;
//!    // Block the process here and do your work.
//!    Ok(())
//! }
//! ```
//!
//! ## Worker
//!
//! Worker reads the socket information from the environment and then connects to the Unix socket.
//!
//! ```ignore
//! use psup::{Result, worker};
//!
//! #[tokio::main]
//! async fn main() -> Result<()> {
//!     // Read supervisor information from the environment 
//!     // and set up the IPC channel with the supervisor
//!     let worker = Worker::new(|stream| {
//!         let (reader, mut writer) = stream.into_split();
//!         // Start sending messages to the supervisor
//!     });
//!     worker.run().await?;
//!     // Block the process here and do your work.
//!     Ok(())
//! }
//! ```

use std::path::PathBuf;

/// Enumeration of errors.
#[derive(Debug, thiserror::Error)]
pub enum Error {
    /// Worker is missing the id environment variable.
    #[error("Worker PSUP_WORKER_ID variable is not set")]
    WorkerNoId,

    /// Worker is missing the socket path environment variable.
    #[error("Worker PSUP_SOCKET variable is not set")]
    WorkerNoSocket,

    /// Worker is missing the detached flag environment variable.
    #[error("Worker PSUP_DETACHED variable is not set")]
    WorkerNoDetached,

    /// Input/output errors.
    #[error(transparent)]
    Io(#[from] std::io::Error),

    /// Error whilst sending bind notifications.
    #[error(transparent)]
    Oneshot(#[from] tokio::sync::oneshot::error::RecvError),

    /// Generic variant for errors created in user code.
    #[error(transparent)]
    Boxed(#[from] Box<dyn std::error::Error + Send>),
}

impl Error {
    /// Helper function to `Box` an error implementation.
    ///
    /// Ssocket handlers can call `map_err(Error::boxed)?` to propagate
    /// foreign errors.
    pub fn boxed(e: impl std::error::Error + Send + 'static) -> Self {
        let err: Box<dyn std::error::Error + Send> = Box::new(e);
        Error::from(err)
    }
}

/// Result type returned by the library.
pub type Result<T> = std::result::Result<T, Error>;

pub(crate) const WORKER_ID: &str = "PSUP_WORKER_ID";
pub(crate) const SOCKET: &str = "PSUP_SOCKET";
pub(crate) const DETACHED: &str = "PSUP_DETACHED";

mod supervisor;
mod worker;

pub use supervisor::{Supervisor, SupervisorBuilder, Task};
pub use worker::Worker;

/// Message sent to worker handlers.
#[derive(Debug)]
pub struct WorkerInfo {
    /// Socket path.
    pub path: PathBuf,
    /// Worker identifier.
    pub id: String,
}