zinit_client/client/
handle.rs

1//! Blocking handle for Rhai to communicate with RPC client
2//!
3//! This handle provides simple blocking methods that Rhai functions can call.
4//! Each method sends a command to an async thread and blocks waiting for the response.
5
6use super::rpc::{
7    PingResponse, ServiceConfig, ServiceStats, ServiceStatus, XinitStatus, ZinitClient,
8};
9use anyhow::{anyhow, Result};
10use std::sync::mpsc;
11use std::thread;
12
13/// Command enum for RPC operations
14enum RpcCmd {
15    Ping {
16        reply: mpsc::Sender<Result<PingResponse>>,
17    },
18    List {
19        reply: mpsc::Sender<Result<Vec<String>>>,
20    },
21    Status {
22        name: String,
23        reply: mpsc::Sender<Result<ServiceStatus>>,
24    },
25    Start {
26        name: String,
27        reply: mpsc::Sender<Result<bool>>,
28    },
29    Stop {
30        name: String,
31        reply: mpsc::Sender<Result<bool>>,
32    },
33    Restart {
34        name: String,
35        reply: mpsc::Sender<Result<bool>>,
36    },
37    Delete {
38        name: String,
39        reply: mpsc::Sender<Result<bool>>,
40    },
41    Reload {
42        name: String,
43        reply: mpsc::Sender<Result<bool>>,
44    },
45    Kill {
46        name: String,
47        signal: String,
48        reply: mpsc::Sender<Result<bool>>,
49    },
50    Stats {
51        name: String,
52        reply: mpsc::Sender<Result<ServiceStats>>,
53    },
54    IsRunning {
55        name: String,
56        reply: mpsc::Sender<Result<bool>>,
57    },
58    Monitor {
59        name: String,
60        config: Box<ServiceConfig>,
61        reply: mpsc::Sender<Result<bool>>,
62    },
63    Logs {
64        reply: mpsc::Sender<Result<Vec<String>>>,
65    },
66    LogsFilter {
67        service: String,
68        reply: mpsc::Sender<Result<Vec<String>>>,
69    },
70    LogsTail {
71        n: u32,
72        reply: mpsc::Sender<Result<Vec<String>>>,
73    },
74    StartAll {
75        reply: mpsc::Sender<Result<bool>>,
76    },
77    StopAll {
78        reply: mpsc::Sender<Result<bool>>,
79    },
80    DeleteAll {
81        reply: mpsc::Sender<Result<bool>>,
82    },
83    Shutdown {
84        reply: mpsc::Sender<Result<bool>>,
85    },
86    Reboot {
87        reply: mpsc::Sender<Result<bool>>,
88    },
89    // Xinit commands
90    XinitList {
91        reply: mpsc::Sender<Result<Vec<String>>>,
92    },
93    XinitRegister {
94        name: String,
95        listen: Vec<String>,
96        backend: String,
97        service: String,
98        idle_timeout: u64,
99        connect_timeout: u64,
100        reply: mpsc::Sender<Result<bool>>,
101    },
102    XinitUnregister {
103        name: String,
104        reply: mpsc::Sender<Result<bool>>,
105    },
106    XinitStatus {
107        name: String,
108        reply: mpsc::Sender<Result<XinitStatus>>,
109    },
110    XinitStatusAll {
111        reply: mpsc::Sender<Result<Vec<XinitStatus>>>,
112    },
113    // Service register/get
114    Register {
115        name: String,
116        config: Box<ServiceConfig>,
117        reply: mpsc::Sender<Result<bool>>,
118    },
119    Get {
120        name: String,
121        reply: mpsc::Sender<Result<ServiceConfig>>,
122    },
123}
124
125/// Synchronous handle for Rhai functions to use
126#[derive(Clone)]
127pub struct ZinitHandle {
128    cmd_tx: mpsc::Sender<RpcCmd>,
129}
130
131impl ZinitHandle {
132    /// Create a new handle that connects to the default server
133    pub fn new() -> Result<Self> {
134        let client = ZinitClient::try_default()?;
135        Self::with_client(client)
136    }
137
138    /// Create a new handle with a specific client
139    pub fn with_client(client: ZinitClient) -> Result<Self> {
140        let (cmd_tx, cmd_rx) = mpsc::channel::<RpcCmd>();
141
142        // Spawn the async worker thread
143        thread::spawn(move || {
144            let rt = tokio::runtime::Builder::new_current_thread()
145                .enable_all()
146                .build()
147                .expect("Failed to create tokio runtime");
148
149            rt.block_on(async {
150                while let Ok(cmd) = cmd_rx.recv() {
151                    match cmd {
152                        RpcCmd::Ping { reply } => {
153                            let _ = reply.send(client.ping().await);
154                        }
155                        RpcCmd::List { reply } => {
156                            let _ = reply.send(client.list().await);
157                        }
158                        RpcCmd::Status { name, reply } => {
159                            let _ = reply.send(client.status(&name).await);
160                        }
161                        RpcCmd::Start { name, reply } => {
162                            let _ = reply.send(client.start(&name).await);
163                        }
164                        RpcCmd::Stop { name, reply } => {
165                            let _ = reply.send(client.stop(&name).await);
166                        }
167                        RpcCmd::Restart { name, reply } => {
168                            let _ = reply.send(client.restart(&name).await);
169                        }
170                        RpcCmd::Delete { name, reply } => {
171                            let _ = reply.send(client.delete(&name).await);
172                        }
173                        RpcCmd::Reload { name, reply } => {
174                            let _ = reply.send(client.reload(&name).await);
175                        }
176                        RpcCmd::Kill {
177                            name,
178                            signal,
179                            reply,
180                        } => {
181                            let _ = reply.send(client.kill(&name, &signal).await);
182                        }
183                        RpcCmd::Stats { name, reply } => {
184                            let _ = reply.send(client.stats(&name).await);
185                        }
186                        RpcCmd::IsRunning { name, reply } => {
187                            let _ = reply.send(client.is_running(&name).await);
188                        }
189                        RpcCmd::Monitor {
190                            name,
191                            config,
192                            reply,
193                        } => {
194                            let _ = reply.send(client.monitor(&name, *config).await);
195                        }
196                        RpcCmd::Logs { reply } => {
197                            let _ = reply.send(client.logs().await);
198                        }
199                        RpcCmd::LogsFilter { service, reply } => {
200                            let _ = reply.send(client.logs_filter(&service).await);
201                        }
202                        RpcCmd::LogsTail { n, reply } => {
203                            let _ = reply.send(client.logs_tail(n).await);
204                        }
205                        RpcCmd::StartAll { reply } => {
206                            let _ = reply.send(client.start_all().await);
207                        }
208                        RpcCmd::StopAll { reply } => {
209                            let _ = reply.send(client.stop_all().await);
210                        }
211                        RpcCmd::DeleteAll { reply } => {
212                            let _ = reply.send(client.delete_all().await);
213                        }
214                        RpcCmd::Shutdown { reply } => {
215                            let _ = reply.send(client.shutdown().await);
216                        }
217                        RpcCmd::Reboot { reply } => {
218                            let _ = reply.send(client.reboot().await);
219                        }
220                        // Xinit handlers
221                        RpcCmd::XinitList { reply } => {
222                            let _ = reply.send(client.xinit_list().await);
223                        }
224                        RpcCmd::XinitRegister {
225                            name,
226                            listen,
227                            backend,
228                            service,
229                            idle_timeout,
230                            connect_timeout,
231                            reply,
232                        } => {
233                            let _ = reply.send(
234                                client
235                                    .xinit_register(
236                                        &name,
237                                        &listen,
238                                        &backend,
239                                        &service,
240                                        idle_timeout,
241                                        connect_timeout,
242                                    )
243                                    .await,
244                            );
245                        }
246                        RpcCmd::XinitUnregister { name, reply } => {
247                            let _ = reply.send(client.xinit_unregister(&name).await);
248                        }
249                        RpcCmd::XinitStatus { name, reply } => {
250                            let _ = reply.send(client.xinit_status(&name).await);
251                        }
252                        RpcCmd::XinitStatusAll { reply } => {
253                            let _ = reply.send(client.xinit_status_all().await);
254                        }
255                        RpcCmd::Register {
256                            name,
257                            config,
258                            reply,
259                        } => {
260                            let _ = reply.send(client.register(&name, *config).await);
261                        }
262                        RpcCmd::Get { name, reply } => {
263                            let _ = reply.send(client.get(&name).await);
264                        }
265                    }
266                }
267            });
268        });
269
270        Ok(Self { cmd_tx })
271    }
272
273    /// Helper to send command and wait for response
274    fn send_recv<T>(&self, cmd_fn: impl FnOnce(mpsc::Sender<Result<T>>) -> RpcCmd) -> Result<T> {
275        let (reply_tx, reply_rx) = mpsc::channel();
276        let cmd = cmd_fn(reply_tx);
277        self.cmd_tx
278            .send(cmd)
279            .map_err(|_| anyhow!("RPC thread not available"))?;
280        reply_rx
281            .recv()
282            .map_err(|_| anyhow!("RPC thread did not respond"))?
283    }
284
285    // ============ System ============
286
287    pub fn ping(&self) -> Result<PingResponse> {
288        self.send_recv(|reply| RpcCmd::Ping { reply })
289    }
290
291    pub fn shutdown(&self) -> Result<bool> {
292        self.send_recv(|reply| RpcCmd::Shutdown { reply })
293    }
294
295    pub fn reboot(&self) -> Result<bool> {
296        self.send_recv(|reply| RpcCmd::Reboot { reply })
297    }
298
299    // ============ Service Management ============
300
301    pub fn list(&self) -> Result<Vec<String>> {
302        self.send_recv(|reply| RpcCmd::List { reply })
303    }
304
305    pub fn status(&self, name: &str) -> Result<ServiceStatus> {
306        self.send_recv(|reply| RpcCmd::Status {
307            name: name.to_string(),
308            reply,
309        })
310    }
311
312    pub fn start(&self, name: &str) -> Result<()> {
313        self.send_recv(|reply| RpcCmd::Start {
314            name: name.to_string(),
315            reply,
316        })?;
317        Ok(())
318    }
319
320    pub fn stop(&self, name: &str) -> Result<()> {
321        self.send_recv(|reply| RpcCmd::Stop {
322            name: name.to_string(),
323            reply,
324        })?;
325        Ok(())
326    }
327
328    pub fn restart(&self, name: &str) -> Result<()> {
329        self.send_recv(|reply| RpcCmd::Restart {
330            name: name.to_string(),
331            reply,
332        })?;
333        Ok(())
334    }
335
336    pub fn delete(&self, name: &str) -> Result<()> {
337        self.send_recv(|reply| RpcCmd::Delete {
338            name: name.to_string(),
339            reply,
340        })?;
341        Ok(())
342    }
343
344    /// Reload a service from its YAML file on disk
345    pub fn reload(&self, name: &str) -> Result<()> {
346        self.send_recv(|reply| RpcCmd::Reload {
347            name: name.to_string(),
348            reply,
349        })?;
350        Ok(())
351    }
352
353    pub fn kill(&self, name: &str, signal: &str) -> Result<()> {
354        self.send_recv(|reply| RpcCmd::Kill {
355            name: name.to_string(),
356            signal: signal.to_string(),
357            reply,
358        })?;
359        Ok(())
360    }
361
362    pub fn stats(&self, name: &str) -> Result<ServiceStats> {
363        self.send_recv(|reply| RpcCmd::Stats {
364            name: name.to_string(),
365            reply,
366        })
367    }
368
369    pub fn is_running(&self, name: &str) -> Result<bool> {
370        self.send_recv(|reply| RpcCmd::IsRunning {
371            name: name.to_string(),
372            reply,
373        })
374    }
375
376    pub fn monitor(&self, name: &str, config: ServiceConfig) -> Result<()> {
377        self.send_recv(|reply| RpcCmd::Monitor {
378            name: name.to_string(),
379            config: Box::new(config),
380            reply,
381        })?;
382        Ok(())
383    }
384
385    pub fn start_all(&self) -> Result<()> {
386        self.send_recv(|reply| RpcCmd::StartAll { reply })?;
387        Ok(())
388    }
389
390    pub fn stop_all(&self) -> Result<()> {
391        self.send_recv(|reply| RpcCmd::StopAll { reply })?;
392        Ok(())
393    }
394
395    pub fn delete_all(&self) -> Result<()> {
396        self.send_recv(|reply| RpcCmd::DeleteAll { reply })?;
397        Ok(())
398    }
399
400    // ============ Logs ============
401
402    pub fn logs(&self) -> Result<Vec<String>> {
403        self.send_recv(|reply| RpcCmd::Logs { reply })
404    }
405
406    pub fn logs_filter(&self, service: &str) -> Result<Vec<String>> {
407        self.send_recv(|reply| RpcCmd::LogsFilter {
408            service: service.to_string(),
409            reply,
410        })
411    }
412
413    pub fn logs_tail(&self, n: u32) -> Result<Vec<String>> {
414        self.send_recv(|reply| RpcCmd::LogsTail { n, reply })
415    }
416
417    // ============ Xinit ============
418
419    pub fn xinit_list(&self) -> Result<Vec<String>> {
420        self.send_recv(|reply| RpcCmd::XinitList { reply })
421    }
422
423    pub fn xinit_register(
424        &self,
425        name: &str,
426        listen: &[String],
427        backend: &str,
428        service: &str,
429        idle_timeout: u64,
430        connect_timeout: u64,
431    ) -> Result<()> {
432        self.send_recv(|reply| RpcCmd::XinitRegister {
433            name: name.to_string(),
434            listen: listen.to_vec(),
435            backend: backend.to_string(),
436            service: service.to_string(),
437            idle_timeout,
438            connect_timeout,
439            reply,
440        })?;
441        Ok(())
442    }
443
444    pub fn xinit_unregister(&self, name: &str) -> Result<()> {
445        self.send_recv(|reply| RpcCmd::XinitUnregister {
446            name: name.to_string(),
447            reply,
448        })?;
449        Ok(())
450    }
451
452    pub fn xinit_status(&self, name: &str) -> Result<XinitStatus> {
453        self.send_recv(|reply| RpcCmd::XinitStatus {
454            name: name.to_string(),
455            reply,
456        })
457    }
458
459    pub fn xinit_status_all(&self) -> Result<Vec<XinitStatus>> {
460        self.send_recv(|reply| RpcCmd::XinitStatusAll { reply })
461    }
462
463    // ============ Service Register/Get ============
464
465    /// Register a service: save config to YAML file and monitor it
466    pub fn register(&self, name: &str, config: ServiceConfig) -> Result<()> {
467        self.send_recv(|reply| RpcCmd::Register {
468            name: name.to_string(),
469            config: Box::new(config),
470            reply,
471        })?;
472        Ok(())
473    }
474
475    /// Get service configuration from YAML file on disk
476    pub fn get(&self, name: &str) -> Result<ServiceConfig> {
477        self.send_recv(|reply| RpcCmd::Get {
478            name: name.to_string(),
479            reply,
480        })
481    }
482}