1use anyhow::{Context, Result};
6use serde::{Deserialize, Serialize, de::DeserializeOwned};
7use serde_json::{Value, json};
8use std::collections::HashMap;
9use std::path::Path;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::net::{TcpStream, UnixStream};
12
13#[derive(Debug, Serialize)]
15struct RpcRequest {
16 jsonrpc: &'static str,
17 method: String,
18 params: Value,
19 id: u64,
20}
21
22#[derive(Debug, Deserialize)]
24struct RpcResponse {
25 #[allow(dead_code)]
26 jsonrpc: String,
27 result: Option<Value>,
28 error: Option<RpcError>,
29 #[allow(dead_code)]
30 id: Value,
31}
32
33#[derive(Debug, Deserialize)]
34struct RpcError {
35 #[allow(dead_code)]
36 code: i32,
37 message: String,
38}
39
40#[derive(Debug, Clone, Default, Serialize, Deserialize)]
42pub struct ProcessNode {
43 pub pid: i32,
44 pub ppid: i32,
45 pub name: String,
46 #[serde(default)]
47 pub children: Vec<i32>,
48}
49
50#[derive(Debug, Clone, Default, Serialize, Deserialize)]
52pub struct ProcessTree {
53 #[serde(default)]
54 pub root: i32,
55 #[serde(default)]
56 pub processes: std::collections::HashMap<i32, ProcessNode>,
57}
58
59impl ProcessTree {
60 pub fn all_pids(&self) -> Vec<i32> {
62 self.processes.keys().copied().collect()
63 }
64
65 pub fn pids_depth_first(&self) -> Vec<i32> {
67 let mut result = Vec::new();
68 self.collect_depth_first(self.root, &mut result);
69 result.reverse();
70 result
71 }
72
73 fn collect_depth_first(&self, pid: i32, result: &mut Vec<i32>) {
74 if let Some(node) = self.processes.get(&pid) {
75 result.push(pid);
76 for &child in &node.children {
77 self.collect_depth_first(child, result);
78 }
79 }
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct ServiceStatus {
86 pub name: String,
87 pub pid: i32,
88 pub state: String,
89 pub target: String,
90 #[serde(default)]
91 pub process_tree: ProcessTree,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct ServiceStats {
97 pub pid: i32,
98 pub memory_usage: u64,
99 pub cpu_usage: f32,
100 pub children: Vec<ProcessStats>,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize)]
105pub struct ProcessStats {
106 pub pid: i32,
107 pub memory_usage: u64,
108 pub cpu_usage: f32,
109}
110
111fn default_period() -> u64 {
112 10
113}
114
115fn default_retry() -> u32 {
116 3
117}
118
119#[derive(Debug, Clone, Default, Serialize, Deserialize)]
121pub struct HealthCheck {
122 #[serde(default = "default_period")]
124 pub period: u64,
125 #[serde(default = "default_retry")]
127 pub retry: u32,
128 #[serde(default)]
130 pub test_cmd: Vec<String>,
131 #[serde(default)]
133 pub test_tcp: Vec<String>,
134 #[serde(default)]
136 pub test_http: Vec<String>,
137 #[serde(default)]
139 pub tcp_kill: bool,
140}
141
142fn default_status() -> String {
143 "start".to_string()
144}
145
146#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct ServiceConfig {
149 pub exec: String,
150 #[serde(default = "default_status")]
152 pub status: String,
153 #[serde(default)]
154 pub oneshot: bool,
155 #[serde(default = "default_shutdown_timeout")]
156 pub shutdown_timeout: u64,
157 #[serde(default)]
158 pub after: Vec<String>,
159 #[serde(default)]
160 pub signal_stop: String,
161 #[serde(default = "default_log")]
162 pub log: String,
163 #[serde(default)]
164 pub env: HashMap<String, String>,
165 #[serde(default)]
166 pub dir: String,
167 #[serde(default)]
168 pub health: HealthCheck,
169}
170
171fn default_shutdown_timeout() -> u64 {
172 10
173}
174fn default_log() -> String {
175 "ring".to_string()
176}
177
178impl Default for ServiceConfig {
179 fn default() -> Self {
180 Self {
181 exec: String::new(),
182 status: "start".to_string(),
183 oneshot: false,
184 shutdown_timeout: 10,
185 after: Vec::new(),
186 signal_stop: String::new(),
187 log: "ring".to_string(),
188 env: HashMap::new(),
189 dir: String::new(),
190 health: HealthCheck::default(),
191 }
192 }
193}
194
195#[derive(Debug, Clone, Serialize, Deserialize)]
197pub struct PingResponse {
198 pub message: String,
199 pub version: String,
200}
201
202#[derive(Debug, Clone, Serialize, Deserialize)]
204pub struct XinitStatus {
205 pub name: String,
206 pub listen: String,
207 pub backend: String,
208 pub service: String,
209 pub running: bool,
210 pub total_connections: u64,
211 pub active_connections: u32,
212 #[serde(default)]
213 pub bytes_to_backend: u64,
214 #[serde(default)]
215 pub bytes_from_backend: u64,
216}
217
218pub struct ZinitClient {
220 addr: String,
221}
222
223impl ZinitClient {
224 pub fn unix<P: AsRef<Path>>(path: P) -> Self {
226 Self {
227 addr: format!("unix:{}", path.as_ref().display()),
228 }
229 }
230
231 pub fn tcp(addr: &str) -> Self {
233 Self {
234 addr: format!("tcp:{}", addr),
235 }
236 }
237
238 pub fn try_default() -> Result<Self> {
240 let socket_path = get_socket_path()?;
241 Ok(Self::unix(socket_path))
242 }
243
244 async fn call<T: DeserializeOwned>(&self, method: &str, params: Value) -> Result<T> {
246 let request = RpcRequest {
247 jsonrpc: "2.0",
248 method: method.to_string(),
249 params,
250 id: 1,
251 };
252
253 let request_json = serde_json::to_string(&request)? + "\n";
254
255 let response_json = if self.addr.starts_with("unix:") {
257 let path = self.addr.trim_start_matches("unix:");
258 let mut stream = UnixStream::connect(path)
259 .await
260 .context("Failed to connect to Unix socket")?;
261
262 stream.write_all(request_json.as_bytes()).await?;
263 stream.flush().await?;
264
265 let mut reader = BufReader::new(stream);
266 let mut line = String::new();
267 reader.read_line(&mut line).await?;
268 line
269 } else {
270 let addr = self.addr.trim_start_matches("tcp:");
271 let mut stream = TcpStream::connect(addr)
272 .await
273 .context("Failed to connect to TCP")?;
274
275 stream.write_all(request_json.as_bytes()).await?;
276 stream.flush().await?;
277
278 let mut reader = BufReader::new(stream);
279 let mut line = String::new();
280 reader.read_line(&mut line).await?;
281 line
282 };
283
284 let response: RpcResponse =
285 serde_json::from_str(&response_json).context("Failed to parse response")?;
286
287 if let Some(error) = response.error {
288 anyhow::bail!("{}", error.message);
289 }
290
291 let result = response.result.unwrap_or(Value::Null);
292 serde_json::from_value(result).context("Failed to parse result")
293 }
294
295 pub async fn ping(&self) -> Result<PingResponse> {
298 self.call("system.ping", json!([])).await
299 }
300
301 pub async fn shutdown(&self) -> Result<bool> {
302 self.call("system.shutdown", json!([])).await
303 }
304
305 pub async fn reboot(&self) -> Result<bool> {
306 self.call("system.reboot", json!([])).await
307 }
308
309 pub async fn list(&self) -> Result<Vec<String>> {
312 self.call("service.list", json!([])).await
313 }
314
315 pub async fn status(&self, name: &str) -> Result<ServiceStatus> {
316 self.call("service.status", json!([name])).await
317 }
318
319 pub async fn start(&self, name: &str) -> Result<bool> {
320 self.call("service.start", json!([name])).await
321 }
322
323 pub async fn stop(&self, name: &str) -> Result<bool> {
324 self.call("service.stop", json!([name])).await
325 }
326
327 pub async fn restart(&self, name: &str) -> Result<bool> {
328 self.call("service.restart", json!([name])).await
329 }
330
331 pub async fn delete(&self, name: &str) -> Result<bool> {
332 self.call("service.delete", json!([name])).await
333 }
334
335 pub async fn reload(&self, name: &str) -> Result<bool> {
338 self.call("service.reload", json!([name])).await
339 }
340
341 pub async fn kill(&self, name: &str, signal: &str) -> Result<bool> {
342 self.call("service.kill", json!([name, signal])).await
343 }
344
345 pub async fn monitor(&self, name: &str, config: ServiceConfig) -> Result<bool> {
346 self.call("service.monitor", json!([name, config])).await
347 }
348
349 pub async fn stats(&self, name: &str) -> Result<ServiceStats> {
350 self.call("service.stats", json!([name])).await
351 }
352
353 pub async fn is_running(&self, name: &str) -> Result<bool> {
354 self.call("service.is_running", json!([name])).await
355 }
356
357 pub async fn start_all(&self) -> Result<bool> {
358 self.call("service.start_all", json!([])).await
359 }
360
361 pub async fn stop_all(&self) -> Result<bool> {
362 self.call("service.stop_all", json!([])).await
363 }
364
365 pub async fn delete_all(&self) -> Result<bool> {
366 self.call("service.delete_all", json!([])).await
367 }
368
369 pub async fn logs(&self) -> Result<Vec<String>> {
372 self.call("logs.get", json!([])).await
373 }
374
375 pub async fn logs_filter(&self, service: &str) -> Result<Vec<String>> {
376 self.call("logs.filter", json!([service])).await
377 }
378
379 pub async fn logs_tail(&self, n: u32) -> Result<Vec<String>> {
380 self.call("logs.tail", json!([n])).await
381 }
382
383 pub async fn xinit_list(&self) -> Result<Vec<String>> {
386 self.call("xinit.list", json!([])).await
387 }
388
389 pub async fn xinit_register(
390 &self,
391 name: &str,
392 listen: &[String],
393 backend: &str,
394 service: &str,
395 idle_timeout: u64,
396 connect_timeout: u64,
397 ) -> Result<bool> {
398 self.call(
399 "xinit.register",
400 json!([
401 name,
402 listen,
403 backend,
404 service,
405 idle_timeout,
406 connect_timeout
407 ]),
408 )
409 .await
410 }
411
412 pub async fn xinit_unregister(&self, name: &str) -> Result<bool> {
413 self.call("xinit.unregister", json!([name])).await
414 }
415
416 pub async fn xinit_status(&self, name: &str) -> Result<XinitStatus> {
417 self.call("xinit.status", json!([name])).await
418 }
419
420 pub async fn xinit_status_all(&self) -> Result<Vec<XinitStatus>> {
421 self.call("xinit.status_all", json!([])).await
422 }
423
424 pub async fn register(&self, name: &str, config: ServiceConfig) -> Result<bool> {
428 self.call("service.register", json!([name, config])).await
429 }
430
431 pub async fn get(&self, name: &str) -> Result<ServiceConfig> {
433 self.call("service.get", json!([name])).await
434 }
435}
436
437const INIT_SOCKET_PATH: &str = "/var/run/zinit.sock";
439const USER_SOCKET_PATH: &str = "hero/var/zinit.sock";
440
441pub fn get_socket_path() -> Result<std::path::PathBuf> {
444 let init_socket = std::path::PathBuf::from(INIT_SOCKET_PATH);
446 if init_socket.exists() {
447 return Ok(init_socket);
448 }
449
450 let home = dirs::home_dir().context("Could not determine home directory")?;
452 Ok(home.join(USER_SOCKET_PATH))
453}