emu_cli/qmp/
client.rs

1use super::messages::{ErrorReturn, Event, GenericReturn, JobInfo, QueryBlock, QueryJobs};
2use anyhow::{anyhow, Result};
3use serde_json::{json, Value};
4use std::{
5    io::{prelude::*, BufReader},
6    os::unix::net::UnixStream,
7    path::PathBuf,
8};
9
10pub struct Client {
11    output: UnixStream,
12    input: BufReader<UnixStream>,
13}
14
15impl Client {
16    pub fn new(us: PathBuf) -> std::io::Result<Self> {
17        let stream = UnixStream::connect(us)?;
18        return Ok(Self {
19            output: stream.try_clone()?,
20            input: BufReader::new(stream),
21        });
22    }
23
24    fn read_input<T>(&mut self) -> Result<T>
25    where
26        T: for<'de> serde::Deserialize<'de> + Default + std::fmt::Debug,
27    {
28        let mut buf = String::new();
29        while let Ok(_) = self.input.read_line(&mut buf) {
30            if buf.ends_with("\r\n}\r\n") {
31                match serde_json::from_str::<T>(&buf) {
32                    Ok(obj) => {
33                        return Ok(obj);
34                    }
35                    Err(e) => {
36                        // incoming event, ignore it and retry
37                        if let Ok(_) = serde_json::from_str::<Event>(&buf) {
38                            buf = String::new();
39                        } else if let Ok(e) = serde_json::from_str::<ErrorReturn>(&buf) {
40                            // got an error, return it
41                            return Err(e.into());
42                        } else if let Ok(ret) = serde_json::from_str::<GenericReturn>(&buf) {
43                            return ret.into();
44                        } else {
45                            // return the original error
46                            return Err(e.into());
47                        }
48                    }
49                }
50            }
51        }
52
53        return Err(anyhow!("Read past end of input"));
54    }
55
56    fn send_output(&mut self, val: Value) -> Result<()> {
57        match self.output.write_all(&val.to_string().as_bytes()) {
58            Ok(_) => Ok(()),
59            Err(e) => Err(anyhow!(e)),
60        }
61    }
62
63    pub fn handshake(&mut self) -> Result<()> {
64        // read_input hangs if the type isn't specified
65        self.read_input::<Event>()?;
66        Ok(())
67    }
68
69    pub fn parsed_reply(&mut self) -> Result<GenericReturn> {
70        self.read_input()
71    }
72
73    pub fn send_command<T>(&mut self, execute: &str, args: Option<Value>) -> Result<T>
74    where
75        T: for<'de> serde::Deserialize<'de> + Default + std::fmt::Debug,
76    {
77        if let Some(args) = args {
78            self.send_output(json!({
79                "execute": execute,
80                "arguments": args,
81            }))?;
82        } else {
83            self.send_output(json!({
84                "execute": execute,
85            }))?;
86        }
87
88        self.read_input()
89    }
90
91    pub fn block_devices(&mut self) -> Result<QueryBlock> {
92        self.send_command("query-block", None)
93    }
94
95    pub fn jobs(&mut self) -> Result<QueryJobs> {
96        self.send_command("query-jobs", None)
97    }
98
99    pub fn disk_nodes(&mut self) -> Result<Vec<String>> {
100        let blocks = self.block_devices()?.result;
101
102        let mut disks = Vec::new();
103
104        for item in blocks {
105            if let Some(inserted) = item.inserted {
106                if let Some(name) = inserted.node_name {
107                    disks.push(name)
108                }
109            }
110        }
111
112        Ok(disks)
113    }
114
115    pub fn wait_for_job(&mut self, id: &str) -> Result<JobInfo> {
116        loop {
117            let res = self.jobs();
118
119            if let Ok(jobs) = res {
120                for job in jobs.result {
121                    if job.id == id {
122                        match job.status.as_str() {
123                            "concluded" | "null" => {
124                                if let Some(error) = job.error {
125                                    self.delete_job(id)?;
126                                    return Err(anyhow!(error));
127                                } else {
128                                    self.delete_job(id)?;
129                                    return Ok(job);
130                                }
131                            }
132                            _ => {}
133                        }
134                        break;
135                    }
136                }
137            } else if let Err(e) = res {
138                self.delete_job(id)?;
139                return Err(e);
140            }
141
142            std::thread::sleep(std::time::Duration::new(0, 200))
143        }
144    }
145
146    pub fn delete_job(&mut self, id: &str) -> Result<()> {
147        loop {
148            let mut found = false;
149            let res = self.send_command::<QueryJobs>("job-dismiss", Some(json!({"id": id})));
150            if let Ok(jobs) = res {
151                for job in &jobs.result {
152                    if job.id == id {
153                        found = true;
154                    }
155                }
156            } else {
157                break;
158            }
159
160            if !found {
161                break;
162            }
163        }
164
165        Ok(())
166    }
167
168    fn cleanup_job(&mut self, res: Result<GenericReturn, anyhow::Error>, id: &str) -> Result<()> {
169        if let Err(e) = self.wait_for_job(id) {
170            self.delete_job(id)?;
171            return Err(e);
172        }
173
174        if let Err(e) = res {
175            return Err(e);
176        }
177
178        Ok(())
179    }
180
181    pub fn snapshot_save(&mut self, name: &str) -> Result<()> {
182        let disks = self.disk_nodes()?;
183
184        let res = self.send_command::<GenericReturn>(
185            "snapshot-save",
186            Some(json!({
187                "job-id": "snapshot",
188                "tag": name,
189                "vmstate": disks[0],
190                "devices": disks,
191            })),
192        );
193
194        self.cleanup_job(res, "snapshot")
195    }
196
197    pub fn snapshot_load(&mut self, name: &str) -> Result<()> {
198        let disks = self.disk_nodes()?;
199
200        let res = self.send_command::<GenericReturn>(
201            "snapshot-load",
202            Some(json!({
203                "job-id": "snapshot",
204                "tag": name,
205                "vmstate": disks[0],
206                "devices": disks,
207            })),
208        );
209
210        self.cleanup_job(res, "snapshot")
211    }
212
213    pub fn snapshot_delete(&mut self, name: &str) -> Result<()> {
214        let disks = self.disk_nodes()?;
215
216        let res = self.send_command::<GenericReturn>(
217            "snapshot-delete",
218            Some(json!({
219                "job-id": "snapshot",
220                "tag": name,
221                "devices": disks,
222            })),
223        );
224
225        self.cleanup_job(res, "snapshot")
226    }
227}