1use super::messages::{ErrorReturn, Event, GenericReturn, JobInfo, QueryBlock, QueryJobs};
2use anyhow::{anyhow, Result};
3use serde_json::{json, Value};
4use std::{
5 io::{prelude::*, BufReader},
6 os::unix::net::UnixStream,
7 path::PathBuf,
8};
9
10pub struct Client {
11 output: UnixStream,
12 input: BufReader<UnixStream>,
13}
14
15impl Client {
16 pub fn new(us: PathBuf) -> std::io::Result<Self> {
17 let stream = UnixStream::connect(us)?;
18 return Ok(Self {
19 output: stream.try_clone()?,
20 input: BufReader::new(stream),
21 });
22 }
23
24 fn read_input<T>(&mut self) -> Result<T>
25 where
26 T: for<'de> serde::Deserialize<'de> + Default + std::fmt::Debug,
27 {
28 let mut buf = String::new();
29 while let Ok(_) = self.input.read_line(&mut buf) {
30 if buf.ends_with("\r\n}\r\n") {
31 match serde_json::from_str::<T>(&buf) {
32 Ok(obj) => {
33 return Ok(obj);
34 }
35 Err(e) => {
36 if let Ok(_) = serde_json::from_str::<Event>(&buf) {
38 buf = String::new();
39 } else if let Ok(e) = serde_json::from_str::<ErrorReturn>(&buf) {
40 return Err(e.into());
42 } else if let Ok(ret) = serde_json::from_str::<GenericReturn>(&buf) {
43 return ret.into();
44 } else {
45 return Err(e.into());
47 }
48 }
49 }
50 }
51 }
52
53 return Err(anyhow!("Read past end of input"));
54 }
55
56 fn send_output(&mut self, val: Value) -> Result<()> {
57 match self.output.write_all(&val.to_string().as_bytes()) {
58 Ok(_) => Ok(()),
59 Err(e) => Err(anyhow!(e)),
60 }
61 }
62
63 pub fn handshake(&mut self) -> Result<()> {
64 self.read_input::<Event>()?;
66 Ok(())
67 }
68
69 pub fn parsed_reply(&mut self) -> Result<GenericReturn> {
70 self.read_input()
71 }
72
73 pub fn send_command<T>(&mut self, execute: &str, args: Option<Value>) -> Result<T>
74 where
75 T: for<'de> serde::Deserialize<'de> + Default + std::fmt::Debug,
76 {
77 if let Some(args) = args {
78 self.send_output(json!({
79 "execute": execute,
80 "arguments": args,
81 }))?;
82 } else {
83 self.send_output(json!({
84 "execute": execute,
85 }))?;
86 }
87
88 self.read_input()
89 }
90
91 pub fn block_devices(&mut self) -> Result<QueryBlock> {
92 self.send_command("query-block", None)
93 }
94
95 pub fn jobs(&mut self) -> Result<QueryJobs> {
96 self.send_command("query-jobs", None)
97 }
98
99 pub fn disk_nodes(&mut self) -> Result<Vec<String>> {
100 let blocks = self.block_devices()?.result;
101
102 let mut disks = Vec::new();
103
104 for item in blocks {
105 if let Some(inserted) = item.inserted {
106 if let Some(name) = inserted.node_name {
107 disks.push(name)
108 }
109 }
110 }
111
112 Ok(disks)
113 }
114
115 pub fn wait_for_job(&mut self, id: &str) -> Result<JobInfo> {
116 loop {
117 let res = self.jobs();
118
119 if let Ok(jobs) = res {
120 for job in jobs.result {
121 if job.id == id {
122 match job.status.as_str() {
123 "concluded" | "null" => {
124 if let Some(error) = job.error {
125 self.delete_job(id)?;
126 return Err(anyhow!(error));
127 } else {
128 self.delete_job(id)?;
129 return Ok(job);
130 }
131 }
132 _ => {}
133 }
134 break;
135 }
136 }
137 } else if let Err(e) = res {
138 self.delete_job(id)?;
139 return Err(e);
140 }
141
142 std::thread::sleep(std::time::Duration::new(0, 200))
143 }
144 }
145
146 pub fn delete_job(&mut self, id: &str) -> Result<()> {
147 loop {
148 let mut found = false;
149 let res = self.send_command::<QueryJobs>("job-dismiss", Some(json!({"id": id})));
150 if let Ok(jobs) = res {
151 for job in &jobs.result {
152 if job.id == id {
153 found = true;
154 }
155 }
156 } else {
157 break;
158 }
159
160 if !found {
161 break;
162 }
163 }
164
165 Ok(())
166 }
167
168 fn cleanup_job(&mut self, res: Result<GenericReturn, anyhow::Error>, id: &str) -> Result<()> {
169 if let Err(e) = self.wait_for_job(id) {
170 self.delete_job(id)?;
171 return Err(e);
172 }
173
174 if let Err(e) = res {
175 return Err(e);
176 }
177
178 Ok(())
179 }
180
181 pub fn snapshot_save(&mut self, name: &str) -> Result<()> {
182 let disks = self.disk_nodes()?;
183
184 let res = self.send_command::<GenericReturn>(
185 "snapshot-save",
186 Some(json!({
187 "job-id": "snapshot",
188 "tag": name,
189 "vmstate": disks[0],
190 "devices": disks,
191 })),
192 );
193
194 self.cleanup_job(res, "snapshot")
195 }
196
197 pub fn snapshot_load(&mut self, name: &str) -> Result<()> {
198 let disks = self.disk_nodes()?;
199
200 let res = self.send_command::<GenericReturn>(
201 "snapshot-load",
202 Some(json!({
203 "job-id": "snapshot",
204 "tag": name,
205 "vmstate": disks[0],
206 "devices": disks,
207 })),
208 );
209
210 self.cleanup_job(res, "snapshot")
211 }
212
213 pub fn snapshot_delete(&mut self, name: &str) -> Result<()> {
214 let disks = self.disk_nodes()?;
215
216 let res = self.send_command::<GenericReturn>(
217 "snapshot-delete",
218 Some(json!({
219 "job-id": "snapshot",
220 "tag": name,
221 "devices": disks,
222 })),
223 );
224
225 self.cleanup_job(res, "snapshot")
226 }
227}