1use crate::connection::ConnectionManager;
2use crate::error::{Result, ZinitError};
3use crate::models::{LogEntry, LogStream, ServiceState, ServiceStatus, ServiceTarget};
4use crate::protocol::ProtocolHandler;
5use crate::retry::RetryStrategy;
6use chrono::Utc;
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tracing::{debug, trace};
12
13#[derive(Debug, Clone)]
15pub struct ClientConfig {
16 pub socket_path: PathBuf,
18 pub connection_timeout: Duration,
20 pub operation_timeout: Duration,
22 pub max_retries: usize,
24 pub retry_delay: Duration,
26 pub max_retry_delay: Duration,
28 pub retry_jitter: bool,
30}
31
32impl Default for ClientConfig {
33 fn default() -> Self {
34 Self {
35 socket_path: PathBuf::from("/var/run/zinit.sock"),
36 connection_timeout: Duration::from_secs(5),
37 operation_timeout: Duration::from_secs(30),
38 max_retries: 3,
39 retry_delay: Duration::from_millis(100),
40 max_retry_delay: Duration::from_secs(5),
41 retry_jitter: true,
42 }
43 }
44}
45
46#[derive(Debug, Clone)]
48pub struct ZinitClient {
49 connection_manager: ConnectionManager,
51 #[allow(dead_code)]
53 config: ClientConfig,
54}
55
56impl ZinitClient {
57 pub fn new(socket_path: impl AsRef<Path>) -> Self {
59 Self::with_config(ClientConfig {
60 socket_path: socket_path.as_ref().to_path_buf(),
61 ..Default::default()
62 })
63 }
64
65 pub fn with_config(config: ClientConfig) -> Self {
67 let retry_strategy = RetryStrategy::new(
68 config.max_retries,
69 config.retry_delay,
70 config.max_retry_delay,
71 config.retry_jitter,
72 );
73
74 let connection_manager = ConnectionManager::new(
75 &config.socket_path,
76 config.connection_timeout,
77 config.operation_timeout,
78 retry_strategy,
79 );
80
81 Self {
82 connection_manager,
83 config,
84 }
85 }
86
87 pub async fn list(&self) -> Result<HashMap<String, ServiceState>> {
89 debug!("Listing all services");
90 let response = self.connection_manager.execute_command("list").await?;
91
92 let map: HashMap<String, String> = serde_json::from_value(response)?;
93 let result = map
94 .into_iter()
95 .map(|(name, state_str)| {
96 let state = match state_str.as_str() {
97 "Unknown" => ServiceState::Unknown,
98 "Blocked" => ServiceState::Blocked,
99 "Spawned" => ServiceState::Spawned,
100 "Running" => ServiceState::Running,
101 "Success" => ServiceState::Success,
102 "Error" => ServiceState::Error,
103 "TestFailure" => ServiceState::TestFailure,
104 _ => ServiceState::Unknown,
105 };
106 (name, state)
107 })
108 .collect();
109
110 Ok(result)
111 }
112
113 pub async fn status(&self, service: impl AsRef<str>) -> Result<ServiceStatus> {
115 let service_name = service.as_ref();
116 debug!("Getting status for service: {}", service_name);
117
118 let command = ProtocolHandler::format_command("status", &[service_name]);
119 let response = self.connection_manager.execute_command(&command).await?;
120
121 let mut status: ServiceStatus = serde_json::from_value(response)?;
122
123 status.state = match status.state.to_string().as_str() {
125 "Unknown" => ServiceState::Unknown,
126 "Blocked" => ServiceState::Blocked,
127 "Spawned" => ServiceState::Spawned,
128 "Running" => ServiceState::Running,
129 "Success" => ServiceState::Success,
130 "Error" => ServiceState::Error,
131 "TestFailure" => ServiceState::TestFailure,
132 _ => ServiceState::Unknown,
133 };
134
135 status.target = match status.target.to_string().as_str() {
137 "Up" => ServiceTarget::Up,
138 "Down" => ServiceTarget::Down,
139 _ => ServiceTarget::Down,
140 };
141
142 Ok(status)
143 }
144
145 pub async fn start(&self, service: impl AsRef<str>) -> Result<()> {
147 let service_name = service.as_ref();
148 debug!("Starting service: {}", service_name);
149
150 let command = ProtocolHandler::format_command("start", &[service_name]);
151 self.connection_manager.execute_command(&command).await?;
152
153 Ok(())
154 }
155
156 pub async fn stop(&self, service: impl AsRef<str>) -> Result<()> {
158 let service_name = service.as_ref();
159 debug!("Stopping service: {}", service_name);
160
161 let command = ProtocolHandler::format_command("stop", &[service_name]);
162 self.connection_manager.execute_command(&command).await?;
163
164 Ok(())
165 }
166
167 pub async fn restart(&self, service: impl AsRef<str>) -> Result<()> {
169 let service_name = service.as_ref();
170 debug!("Restarting service: {}", service_name);
171
172 self.stop(service_name).await?;
174
175 let mut attempts = 0;
177 let max_attempts = 20;
178
179 while attempts < max_attempts {
180 let status = self.status(service_name).await?;
181 if status.pid == 0 && status.target == ServiceTarget::Down {
182 return self.start(service_name).await;
184 }
185
186 attempts += 1;
187 tokio::time::sleep(Duration::from_secs(1)).await;
188 }
189
190 self.kill(service_name, "SIGKILL").await?;
192 self.start(service_name).await
193 }
194
195 pub async fn monitor(&self, service: impl AsRef<str>) -> Result<()> {
197 let service_name = service.as_ref();
198 debug!("Monitoring service: {}", service_name);
199
200 let command = ProtocolHandler::format_command("monitor", &[service_name]);
201 self.connection_manager.execute_command(&command).await?;
202
203 Ok(())
204 }
205
206 pub async fn forget(&self, service: impl AsRef<str>) -> Result<()> {
208 let service_name = service.as_ref();
209 debug!("Forgetting service: {}", service_name);
210
211 let command = ProtocolHandler::format_command("forget", &[service_name]);
212 self.connection_manager.execute_command(&command).await?;
213
214 Ok(())
215 }
216
217 pub async fn kill(&self, service: impl AsRef<str>, signal: impl AsRef<str>) -> Result<()> {
219 let service_name = service.as_ref();
220 let signal_name = signal.as_ref();
221 debug!(
222 "Sending signal {} to service: {}",
223 signal_name, service_name
224 );
225
226 let command = ProtocolHandler::format_command("kill", &[service_name, signal_name]);
227 self.connection_manager.execute_command(&command).await?;
228
229 Ok(())
230 }
231
232 pub async fn logs(&self, follow: bool, filter: Option<impl AsRef<str>>) -> Result<LogStream> {
234 let command = if follow {
235 "log".to_string()
236 } else {
237 "log snapshot".to_string()
238 };
239
240 debug!("Streaming logs with command: {}", command);
241 let stream = self.connection_manager.stream_logs(&command).await?;
242 let reader = BufReader::new(stream);
243 let mut lines = reader.lines();
244
245 let filter_str = filter.as_ref().map(|f| f.as_ref().to_string());
247
248 let log_stream = async_stream::stream! {
249 while let Some(line_result) = lines.next_line().await.transpose() {
250 match line_result {
251 Ok(line) => {
252 trace!("Received log line: {}", line);
253
254 if let Some(entry) = parse_log_line(&line, &filter_str) {
256 yield Ok(entry);
257 }
258 }
259 Err(e) => {
260 yield Err(ZinitError::ConnectionError(e));
261 break;
262 }
263 }
264 }
265 };
266
267 Ok(LogStream {
268 inner: Box::pin(log_stream),
269 })
270 }
271
272 pub async fn shutdown(&self) -> Result<()> {
274 debug!("Shutting down the system");
275 self.connection_manager.execute_command("shutdown").await?;
276 Ok(())
277 }
278
279 pub async fn reboot(&self) -> Result<()> {
281 debug!("Rebooting the system");
282 self.connection_manager.execute_command("reboot").await?;
283 Ok(())
284 }
285}
286
287fn parse_log_line(line: &str, filter: &Option<String>) -> Option<LogEntry> {
289 let parts: Vec<&str> = line.splitn(4, ' ').collect();
291
292 if parts.len() < 4 || !parts[0].starts_with("zinit:") {
293 return None;
294 }
295
296 let level = parts[1];
297 let service = parts[2].trim_start_matches('(').trim_end_matches(')');
298
299 if let Some(filter_str) = filter {
301 if service != filter_str {
302 return None;
303 }
304 }
305
306 let message = parts[3];
307 let timestamp = Utc::now(); Some(LogEntry {
310 timestamp,
311 service: service.to_string(),
312 message: format!("[{}] {}", level, message),
313 })
314}