Skip to main content

ix_daemon/
client.rs

1use crate::{
2    Command, DEFAULT_SOCKET_PATH, DaemonError, EnqueueSyncPayload, Request, Response,
3    ResponseResult, SyncState, WaitSyncPayload,
4};
5use std::path::Path;
6use std::process::Stdio;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::net::UnixStream;
9
10const CONNECT_RETRY_DELAY_MS: u64 = 100;
11const MAX_CONNECT_RETRIES: u32 = 5;
12
13pub struct Client {
14    socket_path: String,
15}
16
17impl Client {
18    pub fn new() -> Self {
19        Self::with_socket_path(DEFAULT_SOCKET_PATH)
20    }
21
22    pub fn with_socket_path(path: impl Into<String>) -> Self {
23        Self {
24            socket_path: path.into(),
25        }
26    }
27
28    fn expanded_socket_path(&self) -> String {
29        expand_tilde(&self.socket_path)
30    }
31
32    pub async fn connect(&self) -> Result<UnixStream, DaemonError> {
33        let socket_path = self.expanded_socket_path();
34
35        for attempt in 0..MAX_CONNECT_RETRIES {
36            match UnixStream::connect(&socket_path).await {
37                Ok(stream) => return Ok(stream),
38                Err(e)
39                    if e.kind() == std::io::ErrorKind::NotFound
40                        || e.kind() == std::io::ErrorKind::ConnectionRefused =>
41                {
42                    if attempt == 0 {
43                        self.start_daemon().await?;
44                    }
45                    tokio::time::sleep(tokio::time::Duration::from_millis(
46                        CONNECT_RETRY_DELAY_MS * (u64::from(attempt) + 1),
47                    ))
48                    .await;
49                }
50                Err(e) => return Err(e.into()),
51            }
52        }
53
54        Err(DaemonError::Internal(format!(
55            "Failed to connect to daemon after {MAX_CONNECT_RETRIES} retries"
56        )))
57    }
58
59    async fn start_daemon(&self) -> Result<(), DaemonError> {
60        let socket_path = self.expanded_socket_path();
61
62        if let Some(parent) = Path::new(&socket_path).parent() {
63            tokio::fs::create_dir_all(parent).await?;
64        }
65
66        let ixcheld_path = std::env::current_exe()
67            .ok()
68            .and_then(|p| p.parent().map(|d| d.join("ixcheld")))
69            .filter(|p| p.exists())
70            .unwrap_or_else(|| "ixcheld".into());
71
72        tokio::process::Command::new(ixcheld_path)
73            .arg("--socket")
74            .arg(&socket_path)
75            .stdin(Stdio::null())
76            .stdout(Stdio::null())
77            .stderr(Stdio::null())
78            .spawn()
79            .map_err(|e| DaemonError::Internal(format!("Failed to start ixcheld: {e}")))?;
80
81        Ok(())
82    }
83
84    pub async fn send(&self, request: Request) -> Result<Response, DaemonError> {
85        let mut stream = self.connect().await?;
86
87        let json = serde_json::to_string(&request)?;
88        stream.write_all(json.as_bytes()).await?;
89        stream.write_all(b"\n").await?;
90        stream.flush().await?;
91
92        let mut reader = BufReader::new(stream);
93        let mut line = String::new();
94        reader.read_line(&mut line).await?;
95
96        let response: Response = serde_json::from_str(line.trim())?;
97        Ok(response)
98    }
99
100    pub async fn ping(&self) -> Result<String, DaemonError> {
101        let request = Request::new("", "", Command::Ping);
102        let response = self.send(request).await?;
103
104        match response.result {
105            ResponseResult::Ok { payload } => {
106                if let crate::ResponsePayload::Ping(ping) = payload {
107                    Ok(ping.daemon_version)
108                } else {
109                    Err(DaemonError::Internal("Unexpected response type".into()))
110                }
111            }
112            ResponseResult::Error { error } => Err(DaemonError::Internal(error.message)),
113        }
114    }
115
116    pub async fn enqueue_sync(
117        &self,
118        repo_root: &str,
119        tool: &str,
120        directory: &str,
121        force: bool,
122    ) -> Result<String, DaemonError> {
123        let request = Request::new(
124            repo_root,
125            tool,
126            Command::EnqueueSync(EnqueueSyncPayload {
127                directory: directory.to_string(),
128                force,
129            }),
130        );
131        let response = self.send(request).await?;
132
133        match response.result {
134            ResponseResult::Ok { payload } => {
135                if let crate::ResponsePayload::EnqueueSync(enqueue) = payload {
136                    Ok(enqueue.sync_id)
137                } else {
138                    Err(DaemonError::Internal("Unexpected response type".into()))
139                }
140            }
141            ResponseResult::Error { error } => Err(DaemonError::Internal(error.message)),
142        }
143    }
144
145    pub async fn wait_sync(
146        &self,
147        repo_root: &str,
148        tool: &str,
149        sync_id: &str,
150        timeout_ms: u64,
151    ) -> Result<SyncState, DaemonError> {
152        let request = Request::new(
153            repo_root,
154            tool,
155            Command::WaitSync(WaitSyncPayload {
156                sync_id: sync_id.to_string(),
157                timeout_ms,
158            }),
159        );
160        let response = self.send(request).await?;
161
162        match response.result {
163            ResponseResult::Ok { payload } => {
164                if let crate::ResponsePayload::WaitSync(wait) = payload {
165                    Ok(wait.state)
166                } else {
167                    Err(DaemonError::Internal("Unexpected response type".into()))
168                }
169            }
170            ResponseResult::Error { error } => Err(DaemonError::Internal(error.message)),
171        }
172    }
173
174    pub async fn sync(
175        &self,
176        repo_root: &str,
177        tool: &str,
178        directory: &str,
179        wait: bool,
180    ) -> Result<SyncState, DaemonError> {
181        let sync_id = self.enqueue_sync(repo_root, tool, directory, false).await?;
182
183        if wait {
184            self.wait_sync(repo_root, tool, &sync_id, 30_000).await
185        } else {
186            Ok(SyncState::Queued)
187        }
188    }
189
190    pub async fn shutdown(&self, reason: &str) -> Result<(), DaemonError> {
191        let request = Request::new(
192            "",
193            "",
194            Command::Shutdown(crate::ShutdownPayload {
195                reason: reason.to_string(),
196            }),
197        );
198        let _ = self.send(request).await;
199        Ok(())
200    }
201}
202
203impl Default for Client {
204    fn default() -> Self {
205        Self::new()
206    }
207}
208
209fn expand_tilde(path: &str) -> String {
210    if let Some(rest) = path.strip_prefix("~/")
211        && let Some(home) = dirs_next::home_dir()
212    {
213        return home.join(rest).to_string_lossy().to_string();
214    }
215    path.to_string()
216}