1use crate::error::{FezError, Result};
2use crate::protocol::frame::{read_frame, write_frame, Frame};
3use crate::protocol::message::{Control, DbusCall, DbusResponse, IncomingControl};
4use crate::transport::Transport;
5use serde_json::{json, Value};
6use std::process::{Child, Stdio};
7use std::sync::mpsc::{self, Receiver, RecvTimeoutError};
8use std::thread;
9use std::time::Duration;
10
11const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
12
13pub struct BridgeClient {
16 child: Child,
17 stdin: std::process::ChildStdin,
18 rx: Receiver<Frame>,
19 host: String,
20 next_channel: u64,
21}
22
23impl BridgeClient {
24 pub fn connect(transport: &dyn Transport) -> Result<BridgeClient> {
27 let mut cmd = transport.command();
28 let program = cmd.get_program().to_string_lossy().into_owned();
29 cmd.stdin(Stdio::piped())
30 .stdout(Stdio::piped())
31 .stderr(Stdio::piped());
32 let mut child = cmd
33 .spawn()
34 .map_err(|source| FezError::Spawn { program, source })?;
35 let stdin = child.stdin.take().expect("piped stdin");
36 let mut stdout = child.stdout.take().expect("piped stdout");
37
38 let (tx, rx) = mpsc::channel::<Frame>();
39 thread::spawn(move || {
40 while let Ok(Some(frame)) = read_frame(&mut stdout) {
41 if tx.send(frame).is_err() {
42 break;
43 }
44 }
45 });
46
47 let mut client = BridgeClient {
48 child,
49 stdin,
50 rx,
51 host: transport.host_label(),
52 next_channel: 1,
53 };
54 client.send_control(&Control::Init {
55 version: 1,
56 host: "localhost".into(),
57 superuser: Some(json!({"id": "sudo"})),
60 })?;
61 client.await_init()?;
62 Ok(client)
63 }
64
65 fn send_control(&mut self, c: &Control) -> Result<()> {
66 write_frame(&mut self.stdin, &Frame::control(&c.to_json())).map_err(FezError::Io)
67 }
68
69 fn recv(&self) -> Result<Frame> {
70 match self.rx.recv_timeout(DEFAULT_TIMEOUT) {
71 Ok(f) => Ok(f),
72 Err(RecvTimeoutError::Timeout) => Err(FezError::Timeout),
73 Err(RecvTimeoutError::Disconnected) => Err(FezError::BridgeClosed),
74 }
75 }
76
77 fn await_init(&mut self) -> Result<()> {
78 loop {
79 let frame = self.recv()?;
80 if frame.channel.is_empty() {
81 let c: IncomingControl =
82 serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
83 if c.command == "init" {
84 return Ok(());
85 }
86 }
87 }
88 }
89
90 fn alloc_channel(&mut self) -> String {
91 let c = format!("c{}", self.next_channel);
92 self.next_channel += 1;
93 c
94 }
95
96 pub fn dbus_open(&mut self, name: &str) -> Result<String> {
98 self.open_dbus(name, false)
99 }
100
101 pub fn dbus_open_privileged(&mut self, name: &str) -> Result<String> {
104 self.open_dbus(name, true)
105 }
106
107 fn open_dbus(&mut self, name: &str, privileged: bool) -> Result<String> {
108 let channel = self.alloc_channel();
109 let mut open = Control::open(&channel, "dbus-json3")
110 .opt("bus", json!("system"))
111 .opt("name", json!(name));
112 if privileged {
113 open = open.opt("superuser", json!("require"));
114 }
115 self.send_control(&open)?;
116 Ok(channel)
117 }
118
119 pub fn dbus_call(
121 &mut self,
122 channel: &str,
123 path: &str,
124 iface: &str,
125 method: &str,
126 args: Value,
127 ) -> Result<Value> {
128 let call = DbusCall::new(channel, path, iface, method, args);
129 write_frame(&mut self.stdin, &Frame::new(channel, call.to_json())).map_err(FezError::Io)?;
130 loop {
131 let frame = self.recv()?;
132 if frame.channel.is_empty() {
133 let c: IncomingControl =
134 serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
135 if c.command == "close" && c.channel.as_deref() == Some(channel) {
136 return Err(FezError::Problem(
137 c.problem.unwrap_or_else(|| "channel-closed".into()),
138 ));
139 }
140 continue;
141 }
142 if frame.channel != channel {
143 continue;
144 }
145 let resp: DbusResponse =
146 serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
147 if resp.id.as_deref() != Some(&call.id) {
148 continue; }
150 if let Some(name) = resp.dbus_error_name() {
151 return Err(FezError::Dbus {
152 name: name.into(),
153 message: resp.dbus_error_message().unwrap_or_default(),
154 });
155 }
156 return Ok(resp.out_args().cloned().unwrap_or(Value::Null));
157 }
158 }
159
160 pub fn stream_collect(&mut self, argv: &[&str]) -> Result<Vec<u8>> {
162 let channel = self.alloc_channel();
163 self.send_control(&Control::open(&channel, "stream").opt("spawn", json!(argv)))?;
164 let mut buf = Vec::new();
165 loop {
166 let frame = self.recv()?;
167 if frame.channel == channel {
168 buf.extend_from_slice(&frame.payload);
169 } else if frame.channel.is_empty() {
170 let c: IncomingControl =
171 serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
172 if c.channel.as_deref() == Some(&channel) {
173 if c.command == "close" {
174 if let Some(p) = c.problem {
175 return Err(FezError::Problem(p));
176 }
177 }
178 if c.command == "done" || c.command == "close" {
179 return Ok(buf);
180 }
181 }
182 }
183 }
184 }
185
186 pub fn stream_each<F: FnMut(&[u8])>(&mut self, argv: &[&str], mut on_chunk: F) -> Result<()> {
188 let channel = self.alloc_channel();
189 self.send_control(&Control::open(&channel, "stream").opt("spawn", json!(argv)))?;
190 loop {
191 let frame = self.recv()?;
192 if frame.channel == channel {
193 on_chunk(&frame.payload);
194 } else if frame.channel.is_empty() {
195 let c: IncomingControl =
196 serde_json::from_slice(&frame.payload).map_err(FezError::Decode)?;
197 if c.channel.as_deref() == Some(&channel) {
198 if c.command == "close" {
199 if let Some(p) = c.problem {
200 return Err(FezError::Problem(p));
201 }
202 }
203 if c.command == "done" || c.command == "close" {
204 return Ok(());
205 }
206 }
207 }
208 }
209 }
210
211 pub fn host(&self) -> &str {
213 &self.host
214 }
215}
216
217impl Drop for BridgeClient {
218 fn drop(&mut self) {
219 let _ = self.child.kill();
220 let _ = self.child.wait();
221 }
222}