sozu 2.1.0

sozu, a fast, reliable, hot reconfigurable HTTP reverse proxy
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
#[cfg(target_os = "freebsd")]
use std::{ffi::c_void, iter::repeat, mem::size_of};
use std::{
    fs::File,
    io::{Error as IoError, Seek},
    net::SocketAddr,
    os::unix::{
        io::{AsRawFd, FromRawFd, IntoRawFd},
        process::CommandExt,
    },
    process::Command,
};

use libc::pid_t;
use mio::net::UnixStream;
use nix::{
    errno::Errno,
    unistd::{ForkResult, fork},
};
use sozu_command_lib::{
    channel::{Channel, ChannelError},
    config::{Config, MetricDetailLevel},
    logging::{AccessLogFormat, LogError, setup_logging},
    proto::command::{MetricDetail, ServerConfig, WorkerRequest, WorkerResponse},
    ready::Ready,
    request::{RequestError, read_initial_state_from_file},
    scm_socket::{Listeners, ScmSocket, ScmSocketError},
    state::{ConfigState, StateError},
};
use sozu_lib::{
    metrics::{self, MetricError},
    server::{Server, ServerError as LibServerError},
};
use tempfile::tempfile;

use crate::util::{self, UtilError};

#[derive(thiserror::Error, Debug)]
pub enum WorkerError {
    #[error("could not read on the channel")]
    ReadChannel(ChannelError),
    #[error("could not parse configuration from temporary file: {0}")]
    ReadRequestsFromFile(RequestError),
    #[error("could not setup metrics on new worker: {0}")]
    SetupMetrics(MetricError),
    #[error("could not create new worker from config: {0}")]
    NewServerFromConfig(LibServerError),
    #[error("could not create {kind} scm socket: {scm_err}")]
    CreateScmSocket {
        kind: String,
        scm_err: ScmSocketError,
    },
    #[error("could not create temporary file to pass the state to the new worker: {0}")]
    CreateStateFile(IoError),
    #[error("could not disable cloexec on {fd_name}'s file descriptor: {util_err}")]
    DisableCloexec {
        fd_name: String,
        util_err: UtilError,
    },
    #[error("could not write state to temporary file: {0}")]
    WriteStateFile(StateError),
    #[error("could not rewind the temporary state file: {0}")]
    Rewind(IoError),
    #[error("could not create MIO pair of unix stream: {0}")]
    CreateUnixStream(IoError),
    #[error("could not send config to the new worker: {0}")]
    SendConfig(ChannelError),
    #[error("unix fork failed: {0}")]
    Fork(Errno),
    #[error("Could not set the worker-to-main channel to {state}: {channel_err}")]
    SetChannel {
        state: String,
        channel_err: ChannelError,
    },
    #[error("could not setup the logger: {0}")]
    SetupLogging(LogError),
    #[error("could not spawn child process: {0}")]
    SpawnChild(std::io::Error),
}

/// Called within a worker process, this starts the actual proxy.
/// Only called from the binary entry point (main.rs), not from the library.
#[allow(dead_code)]
pub fn begin_worker_process(
    worker_to_main_channel_fd: i32,
    worker_to_main_scm_fd: i32,
    configuration_state_fd: i32,
    id: i32,
    command_buffer_size: u64,
    max_command_buffer_size: u64,
) -> Result<(), WorkerError> {
    // The three descriptors were handed to us by the master via
    // `fork_main_into_worker`, each derived from a live `as_raw_fd()`, so
    // they are always valid (>= 0) and pairwise distinct — three separate
    // kernel objects (command channel, SCM socket, state file). A negative
    // value or a collision would mean the master's fd bookkeeping aliased
    // two handoffs, an internal logic bug. (Stays a `debug_assert!`: a
    // genuinely bad descriptor surfaces downstream as a returned channel /
    // file error, never a panic.)
    debug_assert!(
        worker_to_main_channel_fd >= 0,
        "inherited worker-to-main channel fd must be a valid descriptor"
    );
    debug_assert!(
        worker_to_main_scm_fd >= 0,
        "inherited worker-to-main SCM fd must be a valid descriptor"
    );
    debug_assert!(
        configuration_state_fd >= 0,
        "inherited configuration-state fd must be a valid descriptor"
    );
    debug_assert!(
        worker_to_main_channel_fd != worker_to_main_scm_fd
            && worker_to_main_channel_fd != configuration_state_fd
            && worker_to_main_scm_fd != configuration_state_fd,
        "the channel, SCM and state descriptors must be three distinct fds, never aliased"
    );

    let mut worker_to_main_channel: Channel<WorkerResponse, ServerConfig> = Channel::new(
        // SAFETY: `worker_to_main_channel_fd` was just inherited from the
        // pre-exec parent process via the `--fd` CLI argument. It is a valid
        // open descriptor with no other owner inside this freshly-execed
        // worker process. Ownership transfers to the `UnixStream`, whose
        // `Drop` closes the descriptor.
        unsafe { UnixStream::from_raw_fd(worker_to_main_channel_fd) },
        command_buffer_size,
        max_command_buffer_size,
    );

    worker_to_main_channel
        .blocking()
        .map_err(|channel_err| WorkerError::SetChannel {
            state: "blocking".to_string(),
            channel_err,
        })?;

    // SAFETY: `configuration_state_fd` was just inherited from the
    // pre-exec parent process via the `--configuration-state-fd` CLI
    // argument. It is a valid open descriptor with no other owner inside
    // this freshly-execed worker process. Ownership transfers to the
    // `File`, whose `Drop` closes the descriptor.
    let mut configuration_state_file = unsafe { File::from_raw_fd(configuration_state_fd) };

    let worker_config = worker_to_main_channel
        .read_message()
        .map_err(WorkerError::ReadChannel)?;

    let worker_id = format!("{}-{:02}", "WRK", id);

    // The display id is derived purely from the numeric `id` the master
    // assigned (`WorkerId`, a monotonic counter handed in via `--id`): it
    // must carry the canonical `WRK-` prefix and embed that very id so log
    // correlation between master and worker is unambiguous.
    debug_assert!(
        worker_id.starts_with("WRK-"),
        "worker display id must carry the canonical WRK- prefix"
    );
    debug_assert!(
        id < 0 || worker_id.ends_with(&id.to_string()),
        "worker display id must embed the numeric worker id handed in by the master"
    );

    let access_log_format = AccessLogFormat::from(&worker_config.access_log_format());

    // do not try to log anything before this, or the logger will panic
    setup_logging(
        &worker_config.log_target,
        worker_config.log_colored,
        worker_config.access_logs_target.as_deref(),
        Some(access_log_format),
        Some(worker_config.log_colored),
        &worker_config.log_level,
        &worker_id,
    )
    .map_err(WorkerError::SetupLogging)?;

    trace!(
        "Creating worker {} with config: {:#?}",
        worker_id, worker_config
    );
    info!("worker {} starting...", id);

    let initial_state = read_initial_state_from_file(&mut configuration_state_file)
        .map_err(WorkerError::ReadRequestsFromFile)?;

    worker_to_main_channel
        .nonblocking()
        .map_err(|channel_err| WorkerError::SetChannel {
            state: "nonblocking".to_string(),
            channel_err,
        })?;

    let mut worker_to_main_channel: Channel<WorkerResponse, WorkerRequest> =
        worker_to_main_channel.into();
    worker_to_main_channel.readiness.insert(Ready::READABLE);

    if let Some(metrics) = worker_config.metrics.as_ref() {
        let address = metrics
            .address
            .parse::<SocketAddr>()
            .expect("Could not parse metrics address");
        // Convert the proto wire enum back into the configuration enum.
        // Workers receive the detail level over the SCM socket as
        // `Option<i32>`; absent / unrecognised values fall back to the
        // historical default (Cluster) so old binaries on either side keep
        // working.
        let detail = metrics
            .detail
            .and_then(|d| MetricDetail::try_from(d).ok())
            .map(MetricDetailLevel::from)
            .unwrap_or_default();
        metrics::setup(
            &address,
            worker_id,
            metrics.tagged_metrics,
            metrics.prefix.clone(),
            detail,
        )
        .map_err(WorkerError::SetupMetrics)?;
    }

    let worker_to_main_scm_socket =
        ScmSocket::new(worker_to_main_scm_fd).map_err(|scm_err| WorkerError::CreateScmSocket {
            kind: "worker-to-main".to_string(),
            scm_err,
        })?;

    let mut server = Server::try_new_from_config(
        worker_to_main_channel,
        worker_to_main_scm_socket,
        worker_config,
        initial_state,
        true,
    )
    .map_err(WorkerError::NewServerFromConfig)?;

    info!("starting event loop");
    server.run();
    info!("ending event loop");
    Ok(())
}

/// unix-forks the main process
///
/// - Parent: sends config, state and listeners to the new worker
/// - Child: calls the sozu executable path like so: `sozu worker --id <worker_id> [...]`
///
/// returns the child process pid, and channels to talk to it.
pub fn fork_main_into_worker(
    worker_id: &str,
    config: &Config,
    executable_path: String,
    state: &ConfigState,
    listeners: Option<Listeners>,
) -> Result<(pid_t, Channel<WorkerRequest, WorkerResponse>, ScmSocket), WorkerError> {
    // SAFETY: `libc::getpid` takes no input pointers, never fails, and
    // returns a value type. No invariant beyond "FFI signature matches libc".
    trace!("parent({})", unsafe { libc::getpid() });

    let mut state_file = tempfile().map_err(WorkerError::CreateStateFile)?;
    util::disable_close_on_exec(state_file.as_raw_fd()).map_err(|util_err| {
        WorkerError::DisableCloexec {
            fd_name: "state_file".to_string(),
            util_err,
        }
    })?;

    state
        .write_initial_state_to_file(&mut state_file)
        .map_err(WorkerError::WriteStateFile)?;

    state_file.rewind().map_err(WorkerError::Rewind)?;

    let (main_to_worker, worker_to_main) =
        UnixStream::pair().map_err(WorkerError::CreateUnixStream)?;
    let (main_to_worker_scm, worker_to_main_scm) =
        UnixStream::pair().map_err(WorkerError::CreateUnixStream)?;

    let main_to_worker_scm =
        ScmSocket::new(main_to_worker_scm.into_raw_fd()).map_err(|scm_err| {
            WorkerError::CreateScmSocket {
                kind: "main-to-worker".to_string(),
                scm_err,
            }
        })?;

    util::disable_close_on_exec(worker_to_main.as_raw_fd()).map_err(|util_err| {
        WorkerError::DisableCloexec {
            fd_name: "worker-to-main".to_string(),
            util_err,
        }
    })?;
    util::disable_close_on_exec(worker_to_main_scm.as_raw_fd()).map_err(|util_err| {
        WorkerError::DisableCloexec {
            fd_name: "worker-to-main-scm".to_string(),
            util_err,
        }
    })?;

    // The three descriptors the worker child will inherit via its CLI args are
    // distinct kernel objects — none handed off twice: the command channel
    // (`worker_to_main`, `--fd`), the SCM socket (`worker_to_main_scm`,
    // `--scm`) and the state file (`--configuration-state-fd`). A collision
    // would alias two roles onto one fd after exec. (The SCM listener fds
    // themselves travel separately over `send_listeners`, not as CLI args.)
    debug_assert!(
        worker_to_main.as_raw_fd() != worker_to_main_scm.as_raw_fd()
            && worker_to_main.as_raw_fd() != state_file.as_raw_fd()
            && worker_to_main_scm.as_raw_fd() != state_file.as_raw_fd(),
        "worker channel, SCM and state descriptors must be three distinct fds, never aliased"
    );

    let worker_config = ServerConfig::from(config);

    let mut main_to_worker_channel: Channel<ServerConfig, WorkerResponse> = Channel::new(
        main_to_worker,
        worker_config.command_buffer_size,
        worker_config.max_command_buffer_size,
    );

    // DISCUSS: should we really block the channel just to write on it?
    if let Err(e) = main_to_worker_channel.blocking() {
        error!("Could not block the main-to-worker channel: {}", e);
    }

    info!("launching worker {}", worker_id);
    debug!("executable path is {}", executable_path);

    // SAFETY: `fork` is unsafe because the child must avoid touching
    // shared mutable state inherited from the parent. The child branch
    // below restricts itself to `Command::exec`, which replaces the process
    // image entirely — no inherited state matters after that point.
    match unsafe { fork().map_err(WorkerError::Fork)? } {
        ForkResult::Parent { child: worker_pid } => {
            // A successful `fork()` returns the child's pid in the parent
            // branch, which is always strictly positive (0 is the child
            // branch, < 0 never reaches here — `fork` would have returned
            // `Err`). A non-positive pid here would mean we mis-classified
            // the fork outcome and are about to register a bogus worker.
            debug_assert!(
                worker_pid.as_raw() > 0,
                "fork parent branch must observe a strictly positive child pid"
            );
            info!("launching worker {} with pid {}", worker_id, worker_pid);
            main_to_worker_channel
                .write_message(&worker_config)
                .map_err(WorkerError::SendConfig)?;

            main_to_worker_channel
                .nonblocking()
                .map_err(|channel_err| WorkerError::SetChannel {
                    state: "nonblocking".to_string(),
                    channel_err,
                })?;

            if let Some(listeners) = listeners {
                info!("sending listeners to new worker: {:?}", listeners);
                let result = main_to_worker_scm.send_listeners(&listeners);
                info!("sent listeners from main: {:?}", result);
                listeners.close();
            };

            util::disable_close_on_exec(main_to_worker_scm.fd).map_err(|util_err| {
                WorkerError::DisableCloexec {
                    fd_name: "main-to-worker-main-scm".to_string(),
                    util_err,
                }
            })?;

            Ok((
                worker_pid.into(),
                main_to_worker_channel.into(),
                main_to_worker_scm,
            ))
        }
        ForkResult::Child => {
            // SAFETY: `libc::getpid` takes no input pointers, never fails,
            // and returns a value type. No invariant beyond "FFI signature
            // matches libc".
            trace!("child({}):\twill spawn a child", unsafe { libc::getpid() });

            // #515: prefer fd-based exec (`/proc/self/fd/<n>` opened with
            // `O_PATH` on `/proc/self/exe`) so that a worker spawn after
            // the on-disk binary at `executable_path` was replaced still
            // execs the **original** inode the master was started from —
            // race-free against `cp new-sozu /usr/bin/sozu` between master
            // startup and worker auto-restart. The forked child inherits
            // the parent's mmap'd image, so `/proc/self/exe` in the child
            // resolves to the same original inode.
            let exec_path = match crate::util::get_executable_exec_path() {
                Ok(p) => p,
                Err(e) => {
                    error!(
                        "could not open /proc/self/exe O_PATH ({}); falling back to path-based exec — \
                         vulnerable to on-disk binary replacement race per #515",
                        e
                    );
                    executable_path.to_owned()
                }
            };
            let err = Command::new(&exec_path)
                .arg("worker")
                .arg("--id")
                .arg(worker_id)
                .arg("--fd")
                .arg(worker_to_main.as_raw_fd().to_string())
                .arg("--scm")
                .arg(worker_to_main_scm.as_raw_fd().to_string())
                .arg("--configuration-state-fd")
                .arg(state_file.as_raw_fd().to_string())
                .arg("--command-buffer-size")
                .arg(config.command_buffer_size.to_string())
                .arg("--max-command-buffer-size")
                .arg(config.max_command_buffer_size.to_string())
                .exec();

            error!("Failed to spawn child process: {}", err);
            Err(WorkerError::SpawnChild(err))
        }
    }
}