zinit_client/client/
rpc.rs

1//! Simple JSON-RPC client
2//!
3//! Connects to zinit server via Unix socket or TCP.
4
5use anyhow::{Context, Result};
6use serde::{de::DeserializeOwned, Deserialize, Serialize};
7use serde_json::{json, Value};
8use std::collections::HashMap;
9use std::path::Path;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::net::{TcpStream, UnixStream};
12
13/// JSON-RPC Request
14#[derive(Debug, Serialize)]
15struct RpcRequest {
16    jsonrpc: &'static str,
17    method: String,
18    params: Value,
19    id: u64,
20}
21
22/// JSON-RPC Response
23#[derive(Debug, Deserialize)]
24struct RpcResponse {
25    #[allow(dead_code)]
26    jsonrpc: String,
27    result: Option<Value>,
28    error: Option<RpcError>,
29    #[allow(dead_code)]
30    id: Value,
31}
32
33#[derive(Debug, Deserialize)]
34struct RpcError {
35    #[allow(dead_code)]
36    code: i32,
37    message: String,
38}
39
40/// Process node in the process tree
41#[derive(Debug, Clone, Default, Serialize, Deserialize)]
42pub struct ProcessNode {
43    pub pid: i32,
44    pub ppid: i32,
45    pub name: String,
46    #[serde(default)]
47    pub children: Vec<i32>,
48}
49
50/// Process tree for a service
51#[derive(Debug, Clone, Default, Serialize, Deserialize)]
52pub struct ProcessTree {
53    #[serde(default)]
54    pub root: i32,
55    #[serde(default)]
56    pub processes: std::collections::HashMap<i32, ProcessNode>,
57}
58
59impl ProcessTree {
60    /// Get all PIDs in the tree
61    pub fn all_pids(&self) -> Vec<i32> {
62        self.processes.keys().copied().collect()
63    }
64
65    /// Get PIDs ordered deepest-first (for killing)
66    pub fn pids_depth_first(&self) -> Vec<i32> {
67        let mut result = Vec::new();
68        self.collect_depth_first(self.root, &mut result);
69        result.reverse();
70        result
71    }
72
73    fn collect_depth_first(&self, pid: i32, result: &mut Vec<i32>) {
74        if let Some(node) = self.processes.get(&pid) {
75            result.push(pid);
76            for &child in &node.children {
77                self.collect_depth_first(child, result);
78            }
79        }
80    }
81}
82
83/// Service status
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct ServiceStatus {
86    pub name: String,
87    pub pid: i32,
88    pub state: String,
89    pub target: String,
90    #[serde(default)]
91    pub process_tree: ProcessTree,
92}
93
94/// Service statistics
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ServiceStats {
97    pub pid: i32,
98    pub memory_usage: u64,
99    pub cpu_usage: f32,
100    pub children: Vec<ProcessStats>,
101}
102
103/// Process statistics
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct ProcessStats {
106    pub pid: i32,
107    pub memory_usage: u64,
108    pub cpu_usage: f32,
109}
110
111fn default_period() -> u64 {
112    10
113}
114
115fn default_retry() -> u32 {
116    3
117}
118
119/// Health check configuration
120#[derive(Debug, Clone, Default, Serialize, Deserialize)]
121pub struct HealthCheck {
122    /// Period in seconds between health checks (default: 10)
123    #[serde(default = "default_period")]
124    pub period: u64,
125    /// Number of retries before marking service as failed (default: 3)
126    #[serde(default = "default_retry")]
127    pub retry: u32,
128    /// Command-based health check(s)
129    #[serde(default)]
130    pub test_cmd: Vec<String>,
131    /// TCP port check(s): "host:port"
132    #[serde(default)]
133    pub test_tcp: Vec<String>,
134    /// HTTP check(s): full URL
135    #[serde(default)]
136    pub test_http: Vec<String>,
137    /// If true, kill any process on the TCP port before starting
138    #[serde(default)]
139    pub tcp_kill: bool,
140}
141
142fn default_status() -> String {
143    "start".to_string()
144}
145
146/// Service configuration
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct ServiceConfig {
149    pub exec: String,
150    /// Desired status: start (default), stop, or ignore
151    #[serde(default = "default_status")]
152    pub status: String,
153    #[serde(default)]
154    pub oneshot: bool,
155    #[serde(default = "default_shutdown_timeout")]
156    pub shutdown_timeout: u64,
157    #[serde(default)]
158    pub after: Vec<String>,
159    #[serde(default)]
160    pub signal_stop: String,
161    #[serde(default = "default_log")]
162    pub log: String,
163    #[serde(default)]
164    pub env: HashMap<String, String>,
165    #[serde(default)]
166    pub dir: String,
167    #[serde(default)]
168    pub health: HealthCheck,
169}
170
171fn default_shutdown_timeout() -> u64 {
172    10
173}
174fn default_log() -> String {
175    "ring".to_string()
176}
177
178impl Default for ServiceConfig {
179    fn default() -> Self {
180        Self {
181            exec: String::new(),
182            status: "start".to_string(),
183            oneshot: false,
184            shutdown_timeout: 10,
185            after: Vec::new(),
186            signal_stop: String::new(),
187            log: "ring".to_string(),
188            env: HashMap::new(),
189            dir: String::new(),
190            health: HealthCheck::default(),
191        }
192    }
193}
194
195/// Ping response
196#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct PingResponse {
198    pub message: String,
199    pub version: String,
200}
201
202/// Xinit proxy status
203#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct XinitStatus {
205    pub name: String,
206    pub listen: String,
207    pub backend: String,
208    pub service: String,
209    pub running: bool,
210    pub total_connections: u64,
211    pub active_connections: u32,
212    #[serde(default)]
213    pub bytes_to_backend: u64,
214    #[serde(default)]
215    pub bytes_from_backend: u64,
216}
217
218/// Zinit RPC client
219pub struct ZinitClient {
220    addr: String,
221}
222
223impl ZinitClient {
224    /// Connect via Unix socket
225    pub fn unix<P: AsRef<Path>>(path: P) -> Self {
226        Self {
227            addr: format!("unix:{}", path.as_ref().display()),
228        }
229    }
230
231    /// Connect via TCP
232    pub fn tcp(addr: &str) -> Self {
233        Self {
234            addr: format!("tcp:{}", addr),
235        }
236    }
237
238    /// Connect to default socket
239    pub fn try_default() -> Result<Self> {
240        let socket_path = get_socket_path()?;
241        Ok(Self::unix(socket_path))
242    }
243
244    /// Make an RPC call
245    async fn call<T: DeserializeOwned>(&self, method: &str, params: Value) -> Result<T> {
246        let request = RpcRequest {
247            jsonrpc: "2.0",
248            method: method.to_string(),
249            params,
250            id: 1,
251        };
252
253        let request_json = serde_json::to_string(&request)? + "\n";
254
255        // Connect and send
256        let response_json = if self.addr.starts_with("unix:") {
257            let path = self.addr.trim_start_matches("unix:");
258            let mut stream = UnixStream::connect(path)
259                .await
260                .context("Failed to connect to Unix socket")?;
261
262            stream.write_all(request_json.as_bytes()).await?;
263            stream.flush().await?;
264
265            let mut reader = BufReader::new(stream);
266            let mut line = String::new();
267            reader.read_line(&mut line).await?;
268            line
269        } else {
270            let addr = self.addr.trim_start_matches("tcp:");
271            let mut stream = TcpStream::connect(addr)
272                .await
273                .context("Failed to connect to TCP")?;
274
275            stream.write_all(request_json.as_bytes()).await?;
276            stream.flush().await?;
277
278            let mut reader = BufReader::new(stream);
279            let mut line = String::new();
280            reader.read_line(&mut line).await?;
281            line
282        };
283
284        let response: RpcResponse =
285            serde_json::from_str(&response_json).context("Failed to parse response")?;
286
287        if let Some(error) = response.error {
288            anyhow::bail!("{}", error.message);
289        }
290
291        let result = response.result.unwrap_or(Value::Null);
292        serde_json::from_value(result).context("Failed to parse result")
293    }
294
295    // System methods
296
297    pub async fn ping(&self) -> Result<PingResponse> {
298        self.call("system.ping", json!([])).await
299    }
300
301    pub async fn shutdown(&self) -> Result<bool> {
302        self.call("system.shutdown", json!([])).await
303    }
304
305    pub async fn reboot(&self) -> Result<bool> {
306        self.call("system.reboot", json!([])).await
307    }
308
309    // Service methods
310
311    pub async fn list(&self) -> Result<Vec<String>> {
312        self.call("service.list", json!([])).await
313    }
314
315    pub async fn status(&self, name: &str) -> Result<ServiceStatus> {
316        self.call("service.status", json!([name])).await
317    }
318
319    pub async fn start(&self, name: &str) -> Result<bool> {
320        self.call("service.start", json!([name])).await
321    }
322
323    pub async fn stop(&self, name: &str) -> Result<bool> {
324        self.call("service.stop", json!([name])).await
325    }
326
327    pub async fn restart(&self, name: &str) -> Result<bool> {
328        self.call("service.restart", json!([name])).await
329    }
330
331    pub async fn delete(&self, name: &str) -> Result<bool> {
332        self.call("service.delete", json!([name])).await
333    }
334
335    /// Reload a service from its YAML file on disk
336    /// If the service is already monitored, it will be stopped and re-monitored
337    pub async fn reload(&self, name: &str) -> Result<bool> {
338        self.call("service.reload", json!([name])).await
339    }
340
341    pub async fn kill(&self, name: &str, signal: &str) -> Result<bool> {
342        self.call("service.kill", json!([name, signal])).await
343    }
344
345    pub async fn monitor(&self, name: &str, config: ServiceConfig) -> Result<bool> {
346        self.call("service.monitor", json!([name, config])).await
347    }
348
349    pub async fn stats(&self, name: &str) -> Result<ServiceStats> {
350        self.call("service.stats", json!([name])).await
351    }
352
353    pub async fn is_running(&self, name: &str) -> Result<bool> {
354        self.call("service.is_running", json!([name])).await
355    }
356
357    pub async fn start_all(&self) -> Result<bool> {
358        self.call("service.start_all", json!([])).await
359    }
360
361    pub async fn stop_all(&self) -> Result<bool> {
362        self.call("service.stop_all", json!([])).await
363    }
364
365    pub async fn delete_all(&self) -> Result<bool> {
366        self.call("service.delete_all", json!([])).await
367    }
368
369    // Log methods
370
371    pub async fn logs(&self) -> Result<Vec<String>> {
372        self.call("logs.get", json!([])).await
373    }
374
375    pub async fn logs_filter(&self, service: &str) -> Result<Vec<String>> {
376        self.call("logs.filter", json!([service])).await
377    }
378
379    pub async fn logs_tail(&self, n: u32) -> Result<Vec<String>> {
380        self.call("logs.tail", json!([n])).await
381    }
382
383    // Xinit methods
384
385    pub async fn xinit_list(&self) -> Result<Vec<String>> {
386        self.call("xinit.list", json!([])).await
387    }
388
389    pub async fn xinit_register(
390        &self,
391        name: &str,
392        listen: &[String],
393        backend: &str,
394        service: &str,
395        idle_timeout: u64,
396        connect_timeout: u64,
397    ) -> Result<bool> {
398        self.call(
399            "xinit.register",
400            json!([
401                name,
402                listen,
403                backend,
404                service,
405                idle_timeout,
406                connect_timeout
407            ]),
408        )
409        .await
410    }
411
412    pub async fn xinit_unregister(&self, name: &str) -> Result<bool> {
413        self.call("xinit.unregister", json!([name])).await
414    }
415
416    pub async fn xinit_status(&self, name: &str) -> Result<XinitStatus> {
417        self.call("xinit.status", json!([name])).await
418    }
419
420    pub async fn xinit_status_all(&self) -> Result<Vec<XinitStatus>> {
421        self.call("xinit.status_all", json!([])).await
422    }
423
424    // ============ Service Register/Get ============
425
426    /// Register a service: save config to YAML file and monitor it
427    pub async fn register(&self, name: &str, config: ServiceConfig) -> Result<bool> {
428        self.call("service.register", json!([name, config])).await
429    }
430
431    /// Get service configuration from YAML file on disk
432    pub async fn get(&self, name: &str) -> Result<ServiceConfig> {
433        self.call("service.get", json!([name])).await
434    }
435}
436
437/// Socket paths
438const INIT_SOCKET_PATH: &str = "/var/run/zinit.sock";
439const USER_SOCKET_PATH: &str = "hero/var/zinit.sock";
440
441/// Get the default socket path
442/// Prefers init mode socket if it exists, otherwise falls back to user mode
443pub fn get_socket_path() -> Result<std::path::PathBuf> {
444    // Check init mode socket first
445    let init_socket = std::path::PathBuf::from(INIT_SOCKET_PATH);
446    if init_socket.exists() {
447        return Ok(init_socket);
448    }
449
450    // Fall back to user mode socket
451    let home = dirs::home_dir().context("Could not determine home directory")?;
452    Ok(home.join(USER_SOCKET_PATH))
453}