1use crate::{
2 Command, DEFAULT_SOCKET_PATH, DaemonError, EnqueueSyncPayload, Request, Response,
3 ResponseResult, SyncState, WaitSyncPayload,
4};
5use std::path::Path;
6use std::process::Stdio;
7use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
8use tokio::net::UnixStream;
9
10const CONNECT_RETRY_DELAY_MS: u64 = 100;
11const MAX_CONNECT_RETRIES: u32 = 5;
12
13pub struct Client {
14 socket_path: String,
15}
16
17impl Client {
18 pub fn new() -> Self {
19 Self::with_socket_path(DEFAULT_SOCKET_PATH)
20 }
21
22 pub fn with_socket_path(path: impl Into<String>) -> Self {
23 Self {
24 socket_path: path.into(),
25 }
26 }
27
28 fn expanded_socket_path(&self) -> String {
29 expand_tilde(&self.socket_path)
30 }
31
32 pub async fn connect(&self) -> Result<UnixStream, DaemonError> {
33 let socket_path = self.expanded_socket_path();
34
35 for attempt in 0..MAX_CONNECT_RETRIES {
36 match UnixStream::connect(&socket_path).await {
37 Ok(stream) => return Ok(stream),
38 Err(e)
39 if e.kind() == std::io::ErrorKind::NotFound
40 || e.kind() == std::io::ErrorKind::ConnectionRefused =>
41 {
42 if attempt == 0 {
43 self.start_daemon().await?;
44 }
45 tokio::time::sleep(tokio::time::Duration::from_millis(
46 CONNECT_RETRY_DELAY_MS * (u64::from(attempt) + 1),
47 ))
48 .await;
49 }
50 Err(e) => return Err(e.into()),
51 }
52 }
53
54 Err(DaemonError::Internal(format!(
55 "Failed to connect to daemon after {MAX_CONNECT_RETRIES} retries"
56 )))
57 }
58
59 async fn start_daemon(&self) -> Result<(), DaemonError> {
60 let socket_path = self.expanded_socket_path();
61
62 if let Some(parent) = Path::new(&socket_path).parent() {
63 tokio::fs::create_dir_all(parent).await?;
64 }
65
66 let ixcheld_path = std::env::current_exe()
67 .ok()
68 .and_then(|p| p.parent().map(|d| d.join("ixcheld")))
69 .filter(|p| p.exists())
70 .unwrap_or_else(|| "ixcheld".into());
71
72 tokio::process::Command::new(ixcheld_path)
73 .arg("--socket")
74 .arg(&socket_path)
75 .stdin(Stdio::null())
76 .stdout(Stdio::null())
77 .stderr(Stdio::null())
78 .spawn()
79 .map_err(|e| DaemonError::Internal(format!("Failed to start ixcheld: {e}")))?;
80
81 Ok(())
82 }
83
84 pub async fn send(&self, request: Request) -> Result<Response, DaemonError> {
85 let mut stream = self.connect().await?;
86
87 let json = serde_json::to_string(&request)?;
88 stream.write_all(json.as_bytes()).await?;
89 stream.write_all(b"\n").await?;
90 stream.flush().await?;
91
92 let mut reader = BufReader::new(stream);
93 let mut line = String::new();
94 reader.read_line(&mut line).await?;
95
96 let response: Response = serde_json::from_str(line.trim())?;
97 Ok(response)
98 }
99
100 pub async fn ping(&self) -> Result<String, DaemonError> {
101 let request = Request::new("", "", Command::Ping);
102 let response = self.send(request).await?;
103
104 match response.result {
105 ResponseResult::Ok { payload } => {
106 if let crate::ResponsePayload::Ping(ping) = payload {
107 Ok(ping.daemon_version)
108 } else {
109 Err(DaemonError::Internal("Unexpected response type".into()))
110 }
111 }
112 ResponseResult::Error { error } => Err(DaemonError::Internal(error.message)),
113 }
114 }
115
116 pub async fn enqueue_sync(
117 &self,
118 repo_root: &str,
119 tool: &str,
120 directory: &str,
121 force: bool,
122 ) -> Result<String, DaemonError> {
123 let request = Request::new(
124 repo_root,
125 tool,
126 Command::EnqueueSync(EnqueueSyncPayload {
127 directory: directory.to_string(),
128 force,
129 }),
130 );
131 let response = self.send(request).await?;
132
133 match response.result {
134 ResponseResult::Ok { payload } => {
135 if let crate::ResponsePayload::EnqueueSync(enqueue) = payload {
136 Ok(enqueue.sync_id)
137 } else {
138 Err(DaemonError::Internal("Unexpected response type".into()))
139 }
140 }
141 ResponseResult::Error { error } => Err(DaemonError::Internal(error.message)),
142 }
143 }
144
145 pub async fn wait_sync(
146 &self,
147 repo_root: &str,
148 tool: &str,
149 sync_id: &str,
150 timeout_ms: u64,
151 ) -> Result<SyncState, DaemonError> {
152 let request = Request::new(
153 repo_root,
154 tool,
155 Command::WaitSync(WaitSyncPayload {
156 sync_id: sync_id.to_string(),
157 timeout_ms,
158 }),
159 );
160 let response = self.send(request).await?;
161
162 match response.result {
163 ResponseResult::Ok { payload } => {
164 if let crate::ResponsePayload::WaitSync(wait) = payload {
165 Ok(wait.state)
166 } else {
167 Err(DaemonError::Internal("Unexpected response type".into()))
168 }
169 }
170 ResponseResult::Error { error } => Err(DaemonError::Internal(error.message)),
171 }
172 }
173
174 pub async fn sync(
175 &self,
176 repo_root: &str,
177 tool: &str,
178 directory: &str,
179 wait: bool,
180 ) -> Result<SyncState, DaemonError> {
181 let sync_id = self.enqueue_sync(repo_root, tool, directory, false).await?;
182
183 if wait {
184 self.wait_sync(repo_root, tool, &sync_id, 30_000).await
185 } else {
186 Ok(SyncState::Queued)
187 }
188 }
189
190 pub async fn shutdown(&self, reason: &str) -> Result<(), DaemonError> {
191 let request = Request::new(
192 "",
193 "",
194 Command::Shutdown(crate::ShutdownPayload {
195 reason: reason.to_string(),
196 }),
197 );
198 let _ = self.send(request).await;
199 Ok(())
200 }
201}
202
203impl Default for Client {
204 fn default() -> Self {
205 Self::new()
206 }
207}
208
209fn expand_tilde(path: &str) -> String {
210 if let Some(rest) = path.strip_prefix("~/")
211 && let Some(home) = dirs_next::home_dir()
212 {
213 return home.join(rest).to_string_lossy().to_string();
214 }
215 path.to_string()
216}