zinit_client/
client.rs

1use crate::connection::ConnectionManager;
2use crate::error::{Result, ZinitError};
3use crate::models::{LogEntry, LogStream, ServiceState, ServiceStatus, ServiceTarget};
4use crate::protocol::ProtocolHandler;
5use crate::retry::RetryStrategy;
6use chrono::Utc;
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use std::time::Duration;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tracing::{debug, trace};
12
13/// Configuration for the Zinit client
14#[derive(Debug, Clone)]
15pub struct ClientConfig {
16    /// Path to the Zinit Unix socket
17    pub socket_path: PathBuf,
18    /// Timeout for connection attempts
19    pub connection_timeout: Duration,
20    /// Timeout for operations
21    pub operation_timeout: Duration,
22    /// Maximum number of retry attempts
23    pub max_retries: usize,
24    /// Base delay between retries
25    pub retry_delay: Duration,
26    /// Maximum delay between retries
27    pub max_retry_delay: Duration,
28    /// Whether to add jitter to retry delays
29    pub retry_jitter: bool,
30}
31
32impl Default for ClientConfig {
33    fn default() -> Self {
34        Self {
35            socket_path: PathBuf::from("/var/run/zinit.sock"),
36            connection_timeout: Duration::from_secs(5),
37            operation_timeout: Duration::from_secs(30),
38            max_retries: 3,
39            retry_delay: Duration::from_millis(100),
40            max_retry_delay: Duration::from_secs(5),
41            retry_jitter: true,
42        }
43    }
44}
45
46/// Client for interacting with Zinit
47#[derive(Debug, Clone)]
48pub struct ZinitClient {
49    /// Connection manager
50    connection_manager: ConnectionManager,
51    /// Client configuration
52    #[allow(dead_code)]
53    config: ClientConfig,
54}
55
56impl ZinitClient {
57    /// Create a new Zinit client with the default configuration
58    pub fn new(socket_path: impl AsRef<Path>) -> Self {
59        Self::with_config(ClientConfig {
60            socket_path: socket_path.as_ref().to_path_buf(),
61            ..Default::default()
62        })
63    }
64
65    /// Create a new Zinit client with a custom configuration
66    pub fn with_config(config: ClientConfig) -> Self {
67        let retry_strategy = RetryStrategy::new(
68            config.max_retries,
69            config.retry_delay,
70            config.max_retry_delay,
71            config.retry_jitter,
72        );
73
74        let connection_manager = ConnectionManager::new(
75            &config.socket_path,
76            config.connection_timeout,
77            config.operation_timeout,
78            retry_strategy,
79        );
80
81        Self {
82            connection_manager,
83            config,
84        }
85    }
86
87    /// List all services and their states
88    pub async fn list(&self) -> Result<HashMap<String, ServiceState>> {
89        debug!("Listing all services");
90        let response = self.connection_manager.execute_command("list").await?;
91
92        let map: HashMap<String, String> = serde_json::from_value(response)?;
93        let result = map
94            .into_iter()
95            .map(|(name, state_str)| {
96                let state = match state_str.as_str() {
97                    "Unknown" => ServiceState::Unknown,
98                    "Blocked" => ServiceState::Blocked,
99                    "Spawned" => ServiceState::Spawned,
100                    "Running" => ServiceState::Running,
101                    "Success" => ServiceState::Success,
102                    "Error" => ServiceState::Error,
103                    "TestFailure" => ServiceState::TestFailure,
104                    _ => ServiceState::Unknown,
105                };
106                (name, state)
107            })
108            .collect();
109
110        Ok(result)
111    }
112
113    /// Get the status of a service
114    pub async fn status(&self, service: impl AsRef<str>) -> Result<ServiceStatus> {
115        let service_name = service.as_ref();
116        debug!("Getting status for service: {}", service_name);
117
118        let command = ProtocolHandler::format_command("status", &[service_name]);
119        let response = self.connection_manager.execute_command(&command).await?;
120
121        let mut status: ServiceStatus = serde_json::from_value(response)?;
122
123        // Convert state string to enum
124        status.state = match status.state.to_string().as_str() {
125            "Unknown" => ServiceState::Unknown,
126            "Blocked" => ServiceState::Blocked,
127            "Spawned" => ServiceState::Spawned,
128            "Running" => ServiceState::Running,
129            "Success" => ServiceState::Success,
130            "Error" => ServiceState::Error,
131            "TestFailure" => ServiceState::TestFailure,
132            _ => ServiceState::Unknown,
133        };
134
135        // Convert target string to enum
136        status.target = match status.target.to_string().as_str() {
137            "Up" => ServiceTarget::Up,
138            "Down" => ServiceTarget::Down,
139            _ => ServiceTarget::Down,
140        };
141
142        Ok(status)
143    }
144
145    /// Start a service
146    pub async fn start(&self, service: impl AsRef<str>) -> Result<()> {
147        let service_name = service.as_ref();
148        debug!("Starting service: {}", service_name);
149
150        let command = ProtocolHandler::format_command("start", &[service_name]);
151        self.connection_manager.execute_command(&command).await?;
152
153        Ok(())
154    }
155
156    /// Stop a service
157    pub async fn stop(&self, service: impl AsRef<str>) -> Result<()> {
158        let service_name = service.as_ref();
159        debug!("Stopping service: {}", service_name);
160
161        let command = ProtocolHandler::format_command("stop", &[service_name]);
162        self.connection_manager.execute_command(&command).await?;
163
164        Ok(())
165    }
166
167    /// Restart a service
168    pub async fn restart(&self, service: impl AsRef<str>) -> Result<()> {
169        let service_name = service.as_ref();
170        debug!("Restarting service: {}", service_name);
171
172        // First stop the service
173        self.stop(service_name).await?;
174
175        // Wait for the service to stop
176        let mut attempts = 0;
177        let max_attempts = 20;
178
179        while attempts < max_attempts {
180            let status = self.status(service_name).await?;
181            if status.pid == 0 && status.target == ServiceTarget::Down {
182                // Service is stopped, now start it
183                return self.start(service_name).await;
184            }
185
186            attempts += 1;
187            tokio::time::sleep(Duration::from_secs(1)).await;
188        }
189
190        // Service didn't stop gracefully, try to kill it
191        self.kill(service_name, "SIGKILL").await?;
192        self.start(service_name).await
193    }
194
195    /// Monitor a service
196    pub async fn monitor(&self, service: impl AsRef<str>) -> Result<()> {
197        let service_name = service.as_ref();
198        debug!("Monitoring service: {}", service_name);
199
200        let command = ProtocolHandler::format_command("monitor", &[service_name]);
201        self.connection_manager.execute_command(&command).await?;
202
203        Ok(())
204    }
205
206    /// Forget a service
207    pub async fn forget(&self, service: impl AsRef<str>) -> Result<()> {
208        let service_name = service.as_ref();
209        debug!("Forgetting service: {}", service_name);
210
211        let command = ProtocolHandler::format_command("forget", &[service_name]);
212        self.connection_manager.execute_command(&command).await?;
213
214        Ok(())
215    }
216
217    /// Send a signal to a service
218    pub async fn kill(&self, service: impl AsRef<str>, signal: impl AsRef<str>) -> Result<()> {
219        let service_name = service.as_ref();
220        let signal_name = signal.as_ref();
221        debug!(
222            "Sending signal {} to service: {}",
223            signal_name, service_name
224        );
225
226        let command = ProtocolHandler::format_command("kill", &[service_name, signal_name]);
227        self.connection_manager.execute_command(&command).await?;
228
229        Ok(())
230    }
231
232    /// Stream logs from services
233    pub async fn logs(&self, follow: bool, filter: Option<impl AsRef<str>>) -> Result<LogStream> {
234        let command = if follow {
235            "log".to_string()
236        } else {
237            "log snapshot".to_string()
238        };
239
240        debug!("Streaming logs with command: {}", command);
241        let stream = self.connection_manager.stream_logs(&command).await?;
242        let reader = BufReader::new(stream);
243        let mut lines = reader.lines();
244
245        // Create a stream of log entries
246        let filter_str = filter.as_ref().map(|f| f.as_ref().to_string());
247
248        let log_stream = async_stream::stream! {
249            while let Some(line_result) = lines.next_line().await.transpose() {
250                match line_result {
251                    Ok(line) => {
252                        trace!("Received log line: {}", line);
253
254                        // Parse the log line
255                        if let Some(entry) = parse_log_line(&line, &filter_str) {
256                            yield Ok(entry);
257                        }
258                    }
259                    Err(e) => {
260                        yield Err(ZinitError::ConnectionError(e));
261                        break;
262                    }
263                }
264            }
265        };
266
267        Ok(LogStream {
268            inner: Box::pin(log_stream),
269        })
270    }
271
272    /// Shutdown the system
273    pub async fn shutdown(&self) -> Result<()> {
274        debug!("Shutting down the system");
275        self.connection_manager.execute_command("shutdown").await?;
276        Ok(())
277    }
278
279    /// Reboot the system
280    pub async fn reboot(&self) -> Result<()> {
281        debug!("Rebooting the system");
282        self.connection_manager.execute_command("reboot").await?;
283        Ok(())
284    }
285}
286
287/// Parse a log line into a LogEntry
288fn parse_log_line(line: &str, filter: &Option<String>) -> Option<LogEntry> {
289    // Example log line: "zinit: INFO (service) message"
290    let parts: Vec<&str> = line.splitn(4, ' ').collect();
291
292    if parts.len() < 4 || !parts[0].starts_with("zinit:") {
293        return None;
294    }
295
296    let level = parts[1];
297    let service = parts[2].trim_start_matches('(').trim_end_matches(')');
298
299    // Apply filter if provided
300    if let Some(filter_str) = filter {
301        if service != filter_str {
302            return None;
303        }
304    }
305
306    let message = parts[3];
307    let timestamp = Utc::now(); // Zinit doesn't include timestamps, so we use current time
308
309    Some(LogEntry {
310        timestamp,
311        service: service.to_string(),
312        message: format!("[{}] {}", level, message),
313    })
314}