psup_impl/
worker.rs

1//! Worker is a process performing a long-running task.
2use super::{Error, Result, SOCKET, WORKER_ID};
3use futures::Future;
4use tokio::net::UnixStream;
5
6/// Worker process handler.
7pub struct Worker<H, F>
8where
9    H: Fn(UnixStream, String) -> F,
10    F: Future<Output = Result<()>>,
11{
12    handler: Option<H>,
13    relaxed: bool,
14}
15
16impl<H, F> Worker<H, F>
17where
18    H: Fn(UnixStream, String) -> F,
19    F: Future<Output = Result<()>>,
20{
21    /// Create a new worker.
22    pub fn new() -> Self {
23        Self {
24            handler: None,
25            relaxed: false,
26        }
27    }
28
29    /// Set the relaxed flag.
30    ///
31    /// When a worker is relaxed it will start with or without a supervisor.
32    ///
33    /// The default is `false` so workers expect to be run in the context
34    /// of a supervisor and it is an error if no worker id is available in
35    /// the environment.
36    ///
37    /// When this flag is enabled and the required environment variables
38    /// do not exist the worker does not attempt to connect to a supervisor.
39    ///
40    /// Use this mode of operation when a worker process can be run standalone
41    /// or as a worker for a supervisor process.
42    pub fn relaxed(mut self, flag: bool) -> Self {
43        self.relaxed = flag;
44        self
45    }
46
47    /// Set a client connection handler.
48    ///
49    /// The handler function receives the socket stream and opaque
50    /// worker identifier and can communicate with the supervisor using
51    /// the socket stream.
52    pub fn client(mut self, handler: H) -> Self {
53        self.handler = Some(handler);
54        self
55    }
56
57    /// Start this worker running.
58    pub async fn run(&self) -> Result<()> {
59        if let Some(ref handler) = self.handler {
60            if let Some(id) = std::env::var(WORKER_ID).ok() {
61                // If we were given a socket path make the connection.
62                if let Some(path) = std::env::var(SOCKET).ok() {
63                    // Connect to the supervisor socket
64                    let stream = UnixStream::connect(&path).await?;
65                    (handler)(stream, id.to_string()).await?;
66                }
67            } else {
68                if !self.relaxed {
69                    return Err(Error::WorkerNoId);
70                }
71            }
72        }
73        Ok(())
74    }
75}