Skip to main content

fez/protocol/
client.rs

1use crate::error::{FezError, Result};
2use crate::protocol::frame::{read_frame, write_frame, Frame};
3use crate::protocol::message::{Control, DbusCall, DbusResponse, DbusSignal, IncomingControl};
4use crate::transport::Transport;
5use serde_json::{json, Value};
6use std::io;
7use std::process::{Child, Stdio};
8use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
9use std::thread;
10use std::time::Duration;
11
12const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
13
14/// Object path of the bridge's superuser controller on the internal bus.
15///
16/// cockpit-bridge exports `SuperuserRoutingRule` at `/superuser` on its
17/// in-process internal bus (`bridge.py`:
18/// `self.internal_bus.export('/superuser', self.superuser_rule)`), with
19/// interface [`SUPERUSER_IFACE`].
20const SUPERUSER_PATH: &str = "/superuser";
21
22/// D-Bus interface of the bridge's superuser controller.
23///
24/// Defined in cockpit's `superuser.py`
25/// (`SuperuserRoutingRule(..., interface='cockpit.Superuser')`). It exposes the
26/// `Bridges` property (`as`, the ordered list of viable escalation mechanisms)
27/// and the `Start(s)` method (start a named mechanism).
28const SUPERUSER_IFACE: &str = "cockpit.Superuser";
29
30/// Guidance returned when privilege escalation to root fails.
31///
32/// fez tries every escalation mechanism the bridge advertises (sudo, polkit)
33/// and only reports this after all of them fail. The common causes: the
34/// standalone bridge ships no superuser bridge definitions (install
35/// `cockpit-system`), sudo wants a password fez does not supply (configure
36/// passwordless sudo), or no polkit rule grants this user the privileged
37/// action. The message names both mechanisms so the operator knows either path
38/// is viable.
39const ESCALATION_REMEDIATION: &str = "fez could not escalate to root: no superuser mechanism succeeded. Install the cockpit-system package (it ships the sudo/pkexec superuser bridge definitions), then either configure passwordless sudo (NOPASSWD) for this user or grant a polkit rule allowing this user the privileged cockpit action, and retry. fez does not supply sudo passwords";
40
41/// A live connection to a spawned bridge process, multiplexing D-Bus and
42/// stream channels over its stdio.
43pub struct BridgeClient {
44    child: Child,
45    stdin: std::process::ChildStdin,
46    rx: Receiver<Frame>,
47    host: String,
48    next_channel: u64,
49    /// Whether a root peer has been brought up via `cockpit.Superuser.Start`.
50    /// Escalation is performed lazily and at most once per connection.
51    escalated: bool,
52}
53
54impl BridgeClient {
55    /// Spawn the bridge via `transport`, perform the init handshake, and return
56    /// a ready client.
57    pub fn connect(transport: &dyn Transport) -> Result<BridgeClient> {
58        let mut cmd = transport.command();
59        let program = cmd.get_program().to_string_lossy().into_owned();
60        cmd.stdin(Stdio::piped())
61            .stdout(Stdio::piped())
62            .stderr(Stdio::piped());
63        let mut child = cmd
64            .spawn()
65            .map_err(|source| FezError::Spawn { program, source })?;
66        let stdin = child.stdin.take().expect("piped stdin");
67        let mut stdout = child.stdout.take().expect("piped stdout");
68        let mut stderr = child.stderr.take().expect("piped stderr");
69
70        // Always consume stderr so a noisy bridge or SSH transport cannot block
71        // on a full pipe while the client waits for stdout frames.
72        let _stderr_drain = thread::spawn(move || {
73            let _ = io::copy(&mut stderr, &mut io::sink());
74        });
75
76        let (tx, rx) = mpsc::channel::<Frame>();
77        thread::spawn(move || {
78            while let Ok(Some(frame)) = read_frame(&mut stdout) {
79                if tx.send(frame).is_err() {
80                    break;
81                }
82            }
83        });
84
85        let mut client = BridgeClient {
86            child,
87            stdin,
88            rx,
89            host: transport.host_label(),
90            next_channel: 1,
91            escalated: false,
92        };
93        client.send_control(&Control::Init {
94            version: 1,
95            host: "localhost".into(),
96            // Defer escalation: bring up no root peer at init. fez selects a
97            // working mechanism later via `escalate()` (cockpit.Superuser.Start)
98            // so it can fall through sudo -> polkit instead of pinning sudo.
99            superuser: Some(json!("none")),
100        })?;
101        client.await_init()?;
102        Ok(client)
103    }
104
105    fn send_control(&mut self, c: &Control) -> Result<()> {
106        write_frame(&mut self.stdin, &Frame::control(&c.to_json())).map_err(FezError::Io)
107    }
108
109    fn recv(&self) -> Result<Frame> {
110        match self.rx.recv_timeout(DEFAULT_TIMEOUT) {
111            Ok(f) => Ok(f),
112            Err(RecvTimeoutError::Timeout) => Err(FezError::Timeout),
113            Err(RecvTimeoutError::Disconnected) => Err(FezError::BridgeClosed),
114        }
115    }
116
117    /// Complete the bridge handshake.
118    ///
119    /// Waits for the bridge's `init` reply, which completes the handshake.
120    /// Because we send `init` with `superuser: "none"`, the bridge brings up no
121    /// root peer at init time and runs no superuser negotiation, so it emits no
122    /// `superuser-init-done` (cockpit's `SuperuserRoutingRule.init` is only
123    /// invoked, and only then fires `superuser-init-done`, when init carries a
124    /// `superuser` object). Waiting for that message here would hang against a
125    /// real bridge. Escalation is deferred to [`escalate`], run lazily before
126    /// the first privileged channel open.
127    ///
128    /// [`escalate`]: BridgeClient::escalate
129    fn await_init(&mut self) -> Result<()> {
130        loop {
131            let frame = self.recv()?;
132            if !frame.channel.is_empty() {
133                continue;
134            }
135            let c: IncomingControl =
136                serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
137            // The bridge's `init` reply opens the transport. We deferred
138            // escalation (`superuser: "none"`), so there is no further superuser
139            // negotiation to await; the handshake is done.
140            if c.command == "init" {
141                return Ok(());
142            }
143        }
144    }
145
146    fn alloc_channel(&mut self) -> String {
147        let c = format!("c{}", self.next_channel);
148        self.next_channel += 1;
149        c
150    }
151
152    /// Open an unprivileged D-Bus channel to `name` and return its channel id.
153    pub fn dbus_open(&mut self, name: &str) -> Result<String> {
154        self.open_dbus(name, false)
155    }
156
157    /// Open a privileged D-Bus channel (`superuser: "require"`); the bridge
158    /// performs the sudo/polkit escalation and spawns a root peer (Section 5).
159    pub fn dbus_open_privileged(&mut self, name: &str) -> Result<String> {
160        self.open_dbus(name, true)
161    }
162
163    fn open_dbus(&mut self, name: &str, privileged: bool) -> Result<String> {
164        // A privileged channel routes to a root peer, which only exists once we
165        // have escalated. Drive escalation lazily before the first such open;
166        // reads (privileged == false) never escalate.
167        if privileged && !self.escalated {
168            self.escalate()?;
169        }
170        let channel = self.alloc_channel();
171        let mut open = Control::open(&channel, "dbus-json3")
172            .opt("bus", json!("system"))
173            .opt("name", json!(name));
174        if privileged {
175            open = open.opt("superuser", json!("require"));
176        }
177        self.send_control(&open)?;
178        Ok(channel)
179    }
180
181    /// Bring up a root peer by selecting a working escalation mechanism.
182    ///
183    /// With init sent as `superuser: "none"`, no root peer exists until fez
184    /// asks for one. This reads the bridge's advertised mechanisms
185    /// ([`BridgeClient::superuser_bridges`]) and tries each via
186    /// [`BridgeClient::superuser_start`] in order until one succeeds, so a host
187    /// with password-only sudo but a working polkit rule still escalates. The
188    /// `FEZ_ESCALATION` environment variable overrides the default loop:
189    /// `off` disables escalation, and any other value forces that single
190    /// mechanism (no fall-through). Idempotent: a no-op once escalated.
191    ///
192    /// # Errors
193    ///
194    /// Returns [`FezError::AccessDenied`] (exit 11) when no mechanism succeeds,
195    /// when the host advertises none, or when `FEZ_ESCALATION=off`. Propagates
196    /// any non-`Dbus` transport error encountered while talking to the bridge.
197    pub fn escalate(&mut self) -> Result<()> {
198        if self.escalated {
199            return Ok(());
200        }
201        let denied = || FezError::AccessDenied {
202            remediation: ESCALATION_REMEDIATION.into(),
203        };
204        match std::env::var("FEZ_ESCALATION").ok().as_deref() {
205            // Never escalate. Mutations fail; reads are unaffected because they
206            // never call escalate().
207            Some("off") => return Err(denied()),
208            // Force a single named mechanism with no fall-through.
209            Some(name) if !name.is_empty() => {
210                return match self.superuser_start(name) {
211                    Ok(()) => {
212                        self.escalated = true;
213                        Ok(())
214                    }
215                    Err(FezError::Dbus { .. }) => Err(denied()),
216                    Err(e) => Err(e),
217                };
218            }
219            // Empty or unset: default transparent loop below.
220            _ => {}
221        }
222        let names = self.superuser_bridges()?;
223        for name in names {
224            match self.superuser_start(&name) {
225                Ok(()) => {
226                    self.escalated = true;
227                    return Ok(());
228                }
229                // This mechanism could not start (e.g. it needs an unanswerable
230                // credential); try the next advertised one.
231                Err(FezError::Dbus { .. }) => continue,
232                Err(e) => return Err(e),
233            }
234        }
235        Err(denied())
236    }
237
238    /// Open a D-Bus channel to the bridge's internal bus.
239    ///
240    /// The internal bus hosts the bridge's own controllers (notably
241    /// `cockpit.Superuser`). It carries no `name` (the bridge is the peer) and
242    /// is never privileged: the controller decides escalation, it is not itself
243    /// reached through a root peer (cockpit `dbus.py`: `bus == 'internal'`).
244    fn open_dbus_internal(&mut self) -> Result<String> {
245        let channel = self.alloc_channel();
246        let open = Control::open(&channel, "dbus-json3").opt("bus", json!("internal"));
247        self.send_control(&open)?;
248        Ok(channel)
249    }
250
251    /// List the escalation mechanisms the bridge considers viable on this host.
252    ///
253    /// Reads the `cockpit.Superuser` `Bridges` property (signature `as`) over
254    /// the internal bus. The list is the bridge's own ordered, validity-filtered
255    /// set of mechanism names (e.g. `["sudo", "pkexec"]`); an empty list means
256    /// the host has no usable escalation mechanism.
257    ///
258    /// # Errors
259    ///
260    /// Returns [`FezError::Dbus`] if the property read fails, or any transport
261    /// error from opening the internal channel or reading the reply.
262    pub fn superuser_bridges(&mut self) -> Result<Vec<String>> {
263        let channel = self.open_dbus_internal()?;
264        let out = self.dbus_call(
265            &channel,
266            SUPERUSER_PATH,
267            "org.freedesktop.DBus.Properties",
268            "Get",
269            json!([SUPERUSER_IFACE, "Bridges"]),
270        )?;
271        // `dbus_call` returns the out-argument array (`reply[0]`).
272        // `Properties.Get` has a single `v` out-arg, so the `as` value arrives
273        // variant-wrapped: `out = [{"t":"as","v":["sudo",...]}]`. Unwrap the
274        // `{"t","v"}` envelope to reach the array (cockpit-bridge does not
275        // unwrap it for us; treating `out[0]` as the array directly yields an
276        // empty list and a spurious exit-11 deny).
277        let names = out
278            .as_array()
279            .and_then(|args| args.first())
280            .map(variant_value)
281            .and_then(Value::as_array)
282            .map(|arr| {
283                arr.iter()
284                    .filter_map(|v| v.as_str().map(str::to_owned))
285                    .collect()
286            })
287            .unwrap_or_default();
288        Ok(names)
289    }
290
291    /// Ask the bridge to start the named escalation mechanism.
292    ///
293    /// Calls `cockpit.Superuser.Start(name)` over the internal bus. On success
294    /// the bridge has brought up a root peer, and subsequent
295    /// `superuser: "require"` channels route to it. A mechanism that needs a
296    /// credential fez cannot supply surfaces as a D-Bus error, not a hang.
297    ///
298    /// # Errors
299    ///
300    /// Returns [`FezError::Dbus`] when the bridge rejects the start (e.g. the
301    /// mechanism needs an unanswerable credential), or any transport error.
302    pub fn superuser_start(&mut self, name: &str) -> Result<()> {
303        let channel = self.open_dbus_internal()?;
304        self.dbus_call(
305            &channel,
306            SUPERUSER_PATH,
307            SUPERUSER_IFACE,
308            "Start",
309            json!([name]),
310        )?;
311        Ok(())
312    }
313
314    /// Returns the out-argument array (`reply[0]`). Index `[0]` for the first return value.
315    pub fn dbus_call(
316        &mut self,
317        channel: &str,
318        path: &str,
319        iface: &str,
320        method: &str,
321        args: Value,
322    ) -> Result<Value> {
323        let call = DbusCall::new(channel, path, iface, method, args);
324        write_frame(&mut self.stdin, &Frame::new(channel, call.to_json())).map_err(FezError::Io)?;
325        loop {
326            let frame = self.recv()?;
327            if frame.channel.is_empty() {
328                let c: IncomingControl =
329                    serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
330                if c.command == "close" && c.channel.as_deref() == Some(channel) {
331                    return Err(close_problem_to_error(c.problem));
332                }
333                continue;
334            }
335            if frame.channel != channel {
336                continue;
337            }
338            let resp: DbusResponse =
339                serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
340            if resp.id.as_deref() != Some(&call.id) {
341                continue; // signal/notify or stale; ignore
342            }
343            if let Some(name) = resp.dbus_error_name() {
344                return Err(FezError::Dbus {
345                    name: name.into(),
346                    message: resp.dbus_error_message().unwrap_or_default(),
347                });
348            }
349            return Ok(resp.out_args().cloned().unwrap_or(Value::Null));
350        }
351    }
352
353    /// Send a D-Bus method call on `channel` and collect the signals it emits
354    /// until a `Finished` signal (or a channel close) terminates the stream.
355    ///
356    /// PackageKit transactions report their result as a stream of signals on
357    /// the transaction object path rather than as a method reply, so the
358    /// request/reply [`BridgeClient::dbus_call`] cannot observe them. This sends
359    /// the call, then accumulates every `signal` frame on `channel` whose path
360    /// matches `path`, returning the raw `(member, args)` pairs in arrival
361    /// order. The method-call reply itself (an empty reply) is ignored; only
362    /// signals carry the payload. A `Finished` signal ends collection.
363    ///
364    /// # Errors
365    ///
366    /// Returns [`FezError::BridgeClosed`] / [`FezError::Timeout`] on transport
367    /// failure, [`FezError::Decode`] on a malformed frame, or the mapped close
368    /// problem if the channel closes with an error before `Finished`.
369    pub fn dbus_call_collect(
370        &mut self,
371        channel: &str,
372        path: &str,
373        iface: &str,
374        method: &str,
375        args: Value,
376    ) -> Result<Vec<(String, Vec<Value>)>> {
377        let call = DbusCall::new(channel, path, iface, method, args);
378        write_frame(&mut self.stdin, &Frame::new(channel, call.to_json())).map_err(FezError::Io)?;
379        let mut collected: Vec<(String, Vec<Value>)> = Vec::new();
380        loop {
381            let frame = self.recv()?;
382            if frame.channel.is_empty() {
383                let c: IncomingControl =
384                    serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
385                if c.command == "close" && c.channel.as_deref() == Some(channel) {
386                    return Err(close_problem_to_error(c.problem));
387                }
388                continue;
389            }
390            if frame.channel != channel {
391                continue;
392            }
393            // A signal frame? Decode and accumulate; stop on Finished.
394            let sig: DbusSignal =
395                serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
396            let Some(member) = sig.member() else {
397                // Not a signal (e.g. the empty method reply); ignore.
398                continue;
399            };
400            if sig.path() != Some(path) {
401                continue; // signal from a different transaction object
402            }
403            let member = member.to_string();
404            let args = sig.args().cloned().unwrap_or_default();
405            let finished = member == "Finished";
406            collected.push((member, args));
407            if finished {
408                return Ok(collected);
409            }
410        }
411    }
412
413    /// Open a `stream` channel running `argv` and buffer its output until `done`.
414    pub fn stream_collect(&mut self, argv: &[&str]) -> Result<Vec<u8>> {
415        let channel = self.alloc_channel();
416        self.send_control(&Control::open(&channel, "stream").opt("spawn", json!(argv)))?;
417        let mut buf = Vec::new();
418        loop {
419            let frame = self.recv()?;
420            if frame.channel == channel {
421                buf.extend_from_slice(&frame.payload);
422            } else if frame.channel.is_empty() {
423                let c: IncomingControl =
424                    serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
425                if c.channel.as_deref() == Some(&channel) {
426                    if c.command == "close" && c.problem.is_some() {
427                        return Err(close_problem_to_error(c.problem));
428                    }
429                    if c.command == "done" || c.command == "close" {
430                        return Ok(buf);
431                    }
432                }
433            }
434        }
435    }
436
437    /// Open a `stream` channel and invoke `on_chunk` for each data frame until `done`.
438    pub fn stream_each<F: FnMut(&[u8])>(&mut self, argv: &[&str], mut on_chunk: F) -> Result<()> {
439        let channel = self.alloc_channel();
440        self.send_control(&Control::open(&channel, "stream").opt("spawn", json!(argv)))?;
441        loop {
442            let frame = self.recv()?;
443            if frame.channel == channel {
444                on_chunk(&frame.payload);
445            } else if frame.channel.is_empty() {
446                let c: IncomingControl =
447                    serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
448                if c.channel.as_deref() == Some(&channel) {
449                    if c.command == "close" && c.problem.is_some() {
450                        return Err(close_problem_to_error(c.problem));
451                    }
452                    if c.command == "done" || c.command == "close" {
453                        return Ok(());
454                    }
455                }
456            }
457        }
458    }
459
460    /// The host label associated with this connection.
461    pub fn host(&self) -> &str {
462        &self.host
463    }
464}
465
466impl Drop for BridgeClient {
467    fn drop(&mut self) {
468        let _ = self.child.kill();
469        let _ = self.child.wait();
470    }
471}
472
473/// Unwrap a D-Bus variant envelope to its inner value.
474///
475/// cockpit-bridge represents a variant on the wire as `{"t":<sig>,"v":<value>}`
476/// (e.g. a `Properties.Get` out-arg or an `a{sv}` dict value). Return the inner
477/// `v` when present, otherwise the value unchanged, so callers can treat
478/// variant-wrapped and bare values uniformly (same convention as the services
479/// status parser).
480fn variant_value(v: &Value) -> &Value {
481    v.get("v").unwrap_or(v)
482}
483
484/// Convert a channel-close `problem` into the matching [`FezError`].
485///
486/// A privileged channel that the bridge could not escalate closes with
487/// `problem: "access-denied"`; surface that as the dedicated [`FezError::AccessDenied`]
488/// (exit 11, with remediation) instead of a generic channel problem (exit 4),
489/// so privilege failures are distinguishable from missing resources. Any other
490/// problem string keeps the generic [`FezError::Problem`] mapping.
491fn close_problem_to_error(problem: Option<String>) -> FezError {
492    match problem {
493        Some(p) if p == "access-denied" => FezError::AccessDenied {
494            remediation: ESCALATION_REMEDIATION.into(),
495        },
496        Some(p) => FezError::Problem(p),
497        None => FezError::Problem("channel-closed".into()),
498    }
499}