Skip to main content

fez/protocol/
client.rs

1use crate::error::{FezError, Result};
2use crate::protocol::frame::{read_frame, write_frame, Frame};
3use crate::protocol::message::{Control, DbusCall, DbusResponse, IncomingControl};
4use crate::transport::Transport;
5use serde_json::{json, Value};
6use std::process::{Child, Stdio};
7use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
8use std::thread;
9use std::time::Duration;
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
12
13/// A live connection to a spawned bridge process, multiplexing D-Bus and
14/// stream channels over its stdio.
15pub struct BridgeClient {
16    child: Child,
17    stdin: std::process::ChildStdin,
18    rx: Receiver<Frame>,
19    host: String,
20    next_channel: u64,
21}
22
23impl BridgeClient {
24    /// Spawn the bridge via `transport`, perform the init handshake, and return
25    /// a ready client.
26    pub fn connect(transport: &dyn Transport) -> Result<BridgeClient> {
27        let mut cmd = transport.command();
28        let program = cmd.get_program().to_string_lossy().into_owned();
29        cmd.stdin(Stdio::piped())
30            .stdout(Stdio::piped())
31            .stderr(Stdio::piped());
32        let mut child = cmd
33            .spawn()
34            .map_err(|source| FezError::Spawn { program, source })?;
35        let stdin = child.stdin.take().expect("piped stdin");
36        let mut stdout = child.stdout.take().expect("piped stdout");
37
38        let (tx, rx) = mpsc::channel::<Frame>();
39        thread::spawn(move || {
40            while let Ok(Some(frame)) = read_frame(&mut stdout) {
41                if tx.send(frame).is_err() {
42                    break;
43                }
44            }
45        });
46
47        let mut client = BridgeClient {
48            child,
49            stdin,
50            rx,
51            host: transport.host_label(),
52            next_channel: 1,
53        };
54        client.send_control(&Control::Init {
55            version: 1,
56            host: "localhost".into(),
57            // Start the sudo superuser peer up front so later
58            // `superuser: "require"` channels (mutations) can route to root.
59            superuser: Some(json!({"id": "sudo"})),
60        })?;
61        client.await_init()?;
62        Ok(client)
63    }
64
65    fn send_control(&mut self, c: &Control) -> Result<()> {
66        write_frame(&mut self.stdin, &Frame::control(&c.to_json())).map_err(FezError::Io)
67    }
68
69    fn recv(&self) -> Result<Frame> {
70        match self.rx.recv_timeout(DEFAULT_TIMEOUT) {
71            Ok(f) => Ok(f),
72            Err(RecvTimeoutError::Timeout) => Err(FezError::Timeout),
73            Err(RecvTimeoutError::Disconnected) => Err(FezError::BridgeClosed),
74        }
75    }
76
77    fn await_init(&mut self) -> Result<()> {
78        loop {
79            let frame = self.recv()?;
80            if frame.channel.is_empty() {
81                let c: IncomingControl =
82                    serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
83                if c.command == "init" {
84                    return Ok(());
85                }
86            }
87        }
88    }
89
90    fn alloc_channel(&mut self) -> String {
91        let c = format!("c{}", self.next_channel);
92        self.next_channel += 1;
93        c
94    }
95
96    /// Open an unprivileged D-Bus channel to `name` and return its channel id.
97    pub fn dbus_open(&mut self, name: &str) -> Result<String> {
98        self.open_dbus(name, false)
99    }
100
101    /// Open a privileged D-Bus channel (`superuser: "require"`); the bridge
102    /// performs the sudo/polkit escalation and spawns a root peer (Section 5).
103    pub fn dbus_open_privileged(&mut self, name: &str) -> Result<String> {
104        self.open_dbus(name, true)
105    }
106
107    fn open_dbus(&mut self, name: &str, privileged: bool) -> Result<String> {
108        let channel = self.alloc_channel();
109        let mut open = Control::open(&channel, "dbus-json3")
110            .opt("bus", json!("system"))
111            .opt("name", json!(name));
112        if privileged {
113            open = open.opt("superuser", json!("require"));
114        }
115        self.send_control(&open)?;
116        Ok(channel)
117    }
118
119    /// Returns the out-argument array (`reply[0]`). Index `[0]` for the first return value.
120    pub fn dbus_call(
121        &mut self,
122        channel: &str,
123        path: &str,
124        iface: &str,
125        method: &str,
126        args: Value,
127    ) -> Result<Value> {
128        let call = DbusCall::new(channel, path, iface, method, args);
129        write_frame(&mut self.stdin, &Frame::new(channel, call.to_json())).map_err(FezError::Io)?;
130        loop {
131            let frame = self.recv()?;
132            if frame.channel.is_empty() {
133                let c: IncomingControl =
134                    serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
135                if c.command == "close" && c.channel.as_deref() == Some(channel) {
136                    return Err(FezError::Problem(
137                        c.problem.unwrap_or_else(|| "channel-closed".into()),
138                    ));
139                }
140                continue;
141            }
142            if frame.channel != channel {
143                continue;
144            }
145            let resp: DbusResponse =
146                serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
147            if resp.id.as_deref() != Some(&call.id) {
148                continue; // signal/notify or stale; ignore
149            }
150            if let Some(name) = resp.dbus_error_name() {
151                return Err(FezError::Dbus {
152                    name: name.into(),
153                    message: resp.dbus_error_message().unwrap_or_default(),
154                });
155            }
156            return Ok(resp.out_args().cloned().unwrap_or(Value::Null));
157        }
158    }
159
160    /// Open a `stream` channel running `argv` and buffer its output until `done`.
161    pub fn stream_collect(&mut self, argv: &[&str]) -> Result<Vec<u8>> {
162        let channel = self.alloc_channel();
163        self.send_control(&Control::open(&channel, "stream").opt("spawn", json!(argv)))?;
164        let mut buf = Vec::new();
165        loop {
166            let frame = self.recv()?;
167            if frame.channel == channel {
168                buf.extend_from_slice(&frame.payload);
169            } else if frame.channel.is_empty() {
170                let c: IncomingControl =
171                    serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
172                if c.channel.as_deref() == Some(&channel) {
173                    if c.command == "close" {
174                        if let Some(p) = c.problem {
175                            return Err(FezError::Problem(p));
176                        }
177                    }
178                    if c.command == "done" || c.command == "close" {
179                        return Ok(buf);
180                    }
181                }
182            }
183        }
184    }
185
186    /// Open a `stream` channel and invoke `on_chunk` for each data frame until `done`.
187    pub fn stream_each<F: FnMut(&[u8])>(&mut self, argv: &[&str], mut on_chunk: F) -> Result<()> {
188        let channel = self.alloc_channel();
189        self.send_control(&Control::open(&channel, "stream").opt("spawn", json!(argv)))?;
190        loop {
191            let frame = self.recv()?;
192            if frame.channel == channel {
193                on_chunk(&frame.payload);
194            } else if frame.channel.is_empty() {
195                let c: IncomingControl =
196                    serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
197                if c.channel.as_deref() == Some(&channel) {
198                    if c.command == "close" {
199                        if let Some(p) = c.problem {
200                            return Err(FezError::Problem(p));
201                        }
202                    }
203                    if c.command == "done" || c.command == "close" {
204                        return Ok(());
205                    }
206                }
207            }
208        }
209    }
210
211    /// The host label associated with this connection.
212    pub fn host(&self) -> &str {
213        &self.host
214    }
215}
216
217impl Drop for BridgeClient {
218    fn drop(&mut self) {
219        let _ = self.child.kill();
220        let _ = self.child.wait();
221    }
222}