1use super::rpc::{
7 PingResponse, ServiceConfig, ServiceStats, ServiceStatus, XinitStatus, ZinitClient,
8};
9use anyhow::{Result, anyhow};
10use std::sync::mpsc;
11use std::thread;
12
13enum RpcCmd {
15 Ping {
16 reply: mpsc::Sender<Result<PingResponse>>,
17 },
18 List {
19 reply: mpsc::Sender<Result<Vec<String>>>,
20 },
21 Status {
22 name: String,
23 reply: mpsc::Sender<Result<ServiceStatus>>,
24 },
25 Start {
26 name: String,
27 reply: mpsc::Sender<Result<bool>>,
28 },
29 Stop {
30 name: String,
31 reply: mpsc::Sender<Result<bool>>,
32 },
33 Restart {
34 name: String,
35 reply: mpsc::Sender<Result<bool>>,
36 },
37 Delete {
38 name: String,
39 reply: mpsc::Sender<Result<bool>>,
40 },
41 Reload {
42 name: String,
43 reply: mpsc::Sender<Result<bool>>,
44 },
45 Kill {
46 name: String,
47 signal: String,
48 reply: mpsc::Sender<Result<bool>>,
49 },
50 Stats {
51 name: String,
52 reply: mpsc::Sender<Result<ServiceStats>>,
53 },
54 IsRunning {
55 name: String,
56 reply: mpsc::Sender<Result<bool>>,
57 },
58 Monitor {
59 name: String,
60 config: Box<ServiceConfig>,
61 reply: mpsc::Sender<Result<bool>>,
62 },
63 Logs {
64 reply: mpsc::Sender<Result<Vec<String>>>,
65 },
66 LogsFilter {
67 service: String,
68 reply: mpsc::Sender<Result<Vec<String>>>,
69 },
70 LogsTail {
71 n: u32,
72 reply: mpsc::Sender<Result<Vec<String>>>,
73 },
74 StartAll {
75 reply: mpsc::Sender<Result<bool>>,
76 },
77 StopAll {
78 reply: mpsc::Sender<Result<bool>>,
79 },
80 DeleteAll {
81 reply: mpsc::Sender<Result<bool>>,
82 },
83 Shutdown {
84 reply: mpsc::Sender<Result<bool>>,
85 },
86 Reboot {
87 reply: mpsc::Sender<Result<bool>>,
88 },
89 XinitList {
91 reply: mpsc::Sender<Result<Vec<String>>>,
92 },
93 XinitRegister {
94 name: String,
95 listen: Vec<String>,
96 backend: String,
97 service: String,
98 idle_timeout: u64,
99 connect_timeout: u64,
100 reply: mpsc::Sender<Result<bool>>,
101 },
102 XinitUnregister {
103 name: String,
104 reply: mpsc::Sender<Result<bool>>,
105 },
106 XinitStatus {
107 name: String,
108 reply: mpsc::Sender<Result<XinitStatus>>,
109 },
110 XinitStatusAll {
111 reply: mpsc::Sender<Result<Vec<XinitStatus>>>,
112 },
113 Register {
115 name: String,
116 config: Box<ServiceConfig>,
117 reply: mpsc::Sender<Result<bool>>,
118 },
119 Get {
120 name: String,
121 reply: mpsc::Sender<Result<ServiceConfig>>,
122 },
123}
124
125#[derive(Clone)]
127pub struct ZinitHandle {
128 cmd_tx: mpsc::Sender<RpcCmd>,
129}
130
131impl ZinitHandle {
132 pub fn new() -> Result<Self> {
134 let client = ZinitClient::try_default()?;
135 Self::with_client(client)
136 }
137
138 pub fn with_client(client: ZinitClient) -> Result<Self> {
140 let (cmd_tx, cmd_rx) = mpsc::channel::<RpcCmd>();
141
142 thread::spawn(move || {
144 let rt = tokio::runtime::Builder::new_current_thread()
145 .enable_all()
146 .build()
147 .expect("Failed to create tokio runtime");
148
149 rt.block_on(async {
150 while let Ok(cmd) = cmd_rx.recv() {
151 match cmd {
152 RpcCmd::Ping { reply } => {
153 let _ = reply.send(client.ping().await);
154 }
155 RpcCmd::List { reply } => {
156 let _ = reply.send(client.list().await);
157 }
158 RpcCmd::Status { name, reply } => {
159 let _ = reply.send(client.status(&name).await);
160 }
161 RpcCmd::Start { name, reply } => {
162 let _ = reply.send(client.start(&name).await);
163 }
164 RpcCmd::Stop { name, reply } => {
165 let _ = reply.send(client.stop(&name).await);
166 }
167 RpcCmd::Restart { name, reply } => {
168 let _ = reply.send(client.restart(&name).await);
169 }
170 RpcCmd::Delete { name, reply } => {
171 let _ = reply.send(client.delete(&name).await);
172 }
173 RpcCmd::Reload { name, reply } => {
174 let _ = reply.send(client.reload(&name).await);
175 }
176 RpcCmd::Kill {
177 name,
178 signal,
179 reply,
180 } => {
181 let _ = reply.send(client.kill(&name, &signal).await);
182 }
183 RpcCmd::Stats { name, reply } => {
184 let _ = reply.send(client.stats(&name).await);
185 }
186 RpcCmd::IsRunning { name, reply } => {
187 let _ = reply.send(client.is_running(&name).await);
188 }
189 RpcCmd::Monitor {
190 name,
191 config,
192 reply,
193 } => {
194 let _ = reply.send(client.monitor(&name, *config).await);
195 }
196 RpcCmd::Logs { reply } => {
197 let _ = reply.send(client.logs().await);
198 }
199 RpcCmd::LogsFilter { service, reply } => {
200 let _ = reply.send(client.logs_filter(&service).await);
201 }
202 RpcCmd::LogsTail { n, reply } => {
203 let _ = reply.send(client.logs_tail(n).await);
204 }
205 RpcCmd::StartAll { reply } => {
206 let _ = reply.send(client.start_all().await);
207 }
208 RpcCmd::StopAll { reply } => {
209 let _ = reply.send(client.stop_all().await);
210 }
211 RpcCmd::DeleteAll { reply } => {
212 let _ = reply.send(client.delete_all().await);
213 }
214 RpcCmd::Shutdown { reply } => {
215 let _ = reply.send(client.shutdown().await);
216 }
217 RpcCmd::Reboot { reply } => {
218 let _ = reply.send(client.reboot().await);
219 }
220 RpcCmd::XinitList { reply } => {
222 let _ = reply.send(client.xinit_list().await);
223 }
224 RpcCmd::XinitRegister {
225 name,
226 listen,
227 backend,
228 service,
229 idle_timeout,
230 connect_timeout,
231 reply,
232 } => {
233 let _ = reply.send(
234 client
235 .xinit_register(
236 &name,
237 &listen,
238 &backend,
239 &service,
240 idle_timeout,
241 connect_timeout,
242 )
243 .await,
244 );
245 }
246 RpcCmd::XinitUnregister { name, reply } => {
247 let _ = reply.send(client.xinit_unregister(&name).await);
248 }
249 RpcCmd::XinitStatus { name, reply } => {
250 let _ = reply.send(client.xinit_status(&name).await);
251 }
252 RpcCmd::XinitStatusAll { reply } => {
253 let _ = reply.send(client.xinit_status_all().await);
254 }
255 RpcCmd::Register {
256 name,
257 config,
258 reply,
259 } => {
260 let _ = reply.send(client.register(&name, *config).await);
261 }
262 RpcCmd::Get { name, reply } => {
263 let _ = reply.send(client.get(&name).await);
264 }
265 }
266 }
267 });
268 });
269
270 Ok(Self { cmd_tx })
271 }
272
273 fn send_recv<T>(&self, cmd_fn: impl FnOnce(mpsc::Sender<Result<T>>) -> RpcCmd) -> Result<T> {
275 let (reply_tx, reply_rx) = mpsc::channel();
276 let cmd = cmd_fn(reply_tx);
277 self.cmd_tx
278 .send(cmd)
279 .map_err(|_| anyhow!("RPC thread not available"))?;
280 reply_rx
281 .recv()
282 .map_err(|_| anyhow!("RPC thread did not respond"))?
283 }
284
285 pub fn ping(&self) -> Result<PingResponse> {
288 self.send_recv(|reply| RpcCmd::Ping { reply })
289 }
290
291 pub fn shutdown(&self) -> Result<bool> {
292 self.send_recv(|reply| RpcCmd::Shutdown { reply })
293 }
294
295 pub fn reboot(&self) -> Result<bool> {
296 self.send_recv(|reply| RpcCmd::Reboot { reply })
297 }
298
299 pub fn list(&self) -> Result<Vec<String>> {
302 self.send_recv(|reply| RpcCmd::List { reply })
303 }
304
305 pub fn status(&self, name: &str) -> Result<ServiceStatus> {
306 self.send_recv(|reply| RpcCmd::Status {
307 name: name.to_string(),
308 reply,
309 })
310 }
311
312 pub fn start(&self, name: &str) -> Result<()> {
313 self.send_recv(|reply| RpcCmd::Start {
314 name: name.to_string(),
315 reply,
316 })?;
317 Ok(())
318 }
319
320 pub fn stop(&self, name: &str) -> Result<()> {
321 self.send_recv(|reply| RpcCmd::Stop {
322 name: name.to_string(),
323 reply,
324 })?;
325 Ok(())
326 }
327
328 pub fn restart(&self, name: &str) -> Result<()> {
329 self.send_recv(|reply| RpcCmd::Restart {
330 name: name.to_string(),
331 reply,
332 })?;
333 Ok(())
334 }
335
336 pub fn delete(&self, name: &str) -> Result<()> {
337 self.send_recv(|reply| RpcCmd::Delete {
338 name: name.to_string(),
339 reply,
340 })?;
341 Ok(())
342 }
343
344 pub fn reload(&self, name: &str) -> Result<()> {
346 self.send_recv(|reply| RpcCmd::Reload {
347 name: name.to_string(),
348 reply,
349 })?;
350 Ok(())
351 }
352
353 pub fn kill(&self, name: &str, signal: &str) -> Result<()> {
354 self.send_recv(|reply| RpcCmd::Kill {
355 name: name.to_string(),
356 signal: signal.to_string(),
357 reply,
358 })?;
359 Ok(())
360 }
361
362 pub fn stats(&self, name: &str) -> Result<ServiceStats> {
363 self.send_recv(|reply| RpcCmd::Stats {
364 name: name.to_string(),
365 reply,
366 })
367 }
368
369 pub fn is_running(&self, name: &str) -> Result<bool> {
370 self.send_recv(|reply| RpcCmd::IsRunning {
371 name: name.to_string(),
372 reply,
373 })
374 }
375
376 pub fn monitor(&self, name: &str, config: ServiceConfig) -> Result<()> {
377 self.send_recv(|reply| RpcCmd::Monitor {
378 name: name.to_string(),
379 config: Box::new(config),
380 reply,
381 })?;
382 Ok(())
383 }
384
385 pub fn start_all(&self) -> Result<()> {
386 self.send_recv(|reply| RpcCmd::StartAll { reply })?;
387 Ok(())
388 }
389
390 pub fn stop_all(&self) -> Result<()> {
391 self.send_recv(|reply| RpcCmd::StopAll { reply })?;
392 Ok(())
393 }
394
395 pub fn delete_all(&self) -> Result<()> {
396 self.send_recv(|reply| RpcCmd::DeleteAll { reply })?;
397 Ok(())
398 }
399
400 pub fn logs(&self) -> Result<Vec<String>> {
403 self.send_recv(|reply| RpcCmd::Logs { reply })
404 }
405
406 pub fn logs_filter(&self, service: &str) -> Result<Vec<String>> {
407 self.send_recv(|reply| RpcCmd::LogsFilter {
408 service: service.to_string(),
409 reply,
410 })
411 }
412
413 pub fn logs_tail(&self, n: u32) -> Result<Vec<String>> {
414 self.send_recv(|reply| RpcCmd::LogsTail { n, reply })
415 }
416
417 pub fn xinit_list(&self) -> Result<Vec<String>> {
420 self.send_recv(|reply| RpcCmd::XinitList { reply })
421 }
422
423 pub fn xinit_register(
424 &self,
425 name: &str,
426 listen: &[String],
427 backend: &str,
428 service: &str,
429 idle_timeout: u64,
430 connect_timeout: u64,
431 ) -> Result<()> {
432 self.send_recv(|reply| RpcCmd::XinitRegister {
433 name: name.to_string(),
434 listen: listen.to_vec(),
435 backend: backend.to_string(),
436 service: service.to_string(),
437 idle_timeout,
438 connect_timeout,
439 reply,
440 })?;
441 Ok(())
442 }
443
444 pub fn xinit_unregister(&self, name: &str) -> Result<()> {
445 self.send_recv(|reply| RpcCmd::XinitUnregister {
446 name: name.to_string(),
447 reply,
448 })?;
449 Ok(())
450 }
451
452 pub fn xinit_status(&self, name: &str) -> Result<XinitStatus> {
453 self.send_recv(|reply| RpcCmd::XinitStatus {
454 name: name.to_string(),
455 reply,
456 })
457 }
458
459 pub fn xinit_status_all(&self) -> Result<Vec<XinitStatus>> {
460 self.send_recv(|reply| RpcCmd::XinitStatusAll { reply })
461 }
462
463 pub fn register(&self, name: &str, config: ServiceConfig) -> Result<()> {
467 self.send_recv(|reply| RpcCmd::Register {
468 name: name.to_string(),
469 config: Box::new(config),
470 reply,
471 })?;
472 Ok(())
473 }
474
475 pub fn get(&self, name: &str) -> Result<ServiceConfig> {
477 self.send_recv(|reply| RpcCmd::Get {
478 name: name.to_string(),
479 reply,
480 })
481 }
482}