psup_impl/worker.rs
1//! Worker is a process performing a long-running task.
2use super::{Error, Result, SOCKET, WORKER_ID};
3use futures::Future;
4use tokio::net::UnixStream;
5
6/// Worker process handler.
7pub struct Worker<H, F>
8where
9 H: Fn(UnixStream, String) -> F,
10 F: Future<Output = Result<()>>,
11{
12 handler: Option<H>,
13 relaxed: bool,
14}
15
16impl<H, F> Worker<H, F>
17where
18 H: Fn(UnixStream, String) -> F,
19 F: Future<Output = Result<()>>,
20{
21 /// Create a new worker.
22 pub fn new() -> Self {
23 Self {
24 handler: None,
25 relaxed: false,
26 }
27 }
28
29 /// Set the relaxed flag.
30 ///
31 /// When a worker is relaxed it will start with or without a supervisor.
32 ///
33 /// The default is `false` so workers expect to be run in the context
34 /// of a supervisor and it is an error if no worker id is available in
35 /// the environment.
36 ///
37 /// When this flag is enabled and the required environment variables
38 /// do not exist the worker does not attempt to connect to a supervisor.
39 ///
40 /// Use this mode of operation when a worker process can be run standalone
41 /// or as a worker for a supervisor process.
42 pub fn relaxed(mut self, flag: bool) -> Self {
43 self.relaxed = flag;
44 self
45 }
46
47 /// Set a client connection handler.
48 ///
49 /// The handler function receives the socket stream and opaque
50 /// worker identifier and can communicate with the supervisor using
51 /// the socket stream.
52 pub fn client(mut self, handler: H) -> Self {
53 self.handler = Some(handler);
54 self
55 }
56
57 /// Start this worker running.
58 pub async fn run(&self) -> Result<()> {
59 if let Some(ref handler) = self.handler {
60 if let Some(id) = std::env::var(WORKER_ID).ok() {
61 // If we were given a socket path make the connection.
62 if let Some(path) = std::env::var(SOCKET).ok() {
63 // Connect to the supervisor socket
64 let stream = UnixStream::connect(&path).await?;
65 (handler)(stream, id.to_string()).await?;
66 }
67 } else {
68 if !self.relaxed {
69 return Err(Error::WorkerNoId);
70 }
71 }
72 }
73 Ok(())
74 }
75}