1use std::{
2 collections::{BTreeMap, HashMap},
3 path::PathBuf,
4 process::Stdio,
5 time::Duration,
6};
7
8use pueue_lib::{
9 network::message::{
10 AddRequest, GroupRequest, KillRequest, LogRequest, ResetRequest, ResetTarget, Signal,
11 StreamRequest, TaskSelection,
12 },
13 Client, Group, Request, Response, Settings, Task, TaskStatus,
14};
15use snap::read::FrameDecoder;
16use tokio::{
17 process::{Child, Command},
18 sync::{mpsc::Sender, Mutex},
19 time::sleep,
20};
21
22use crate::error::{Error, InnerError, Result};
23
24pub(crate) struct Pueue {
25 group: String,
26 client: Mutex<Client>,
27}
28
29impl Pueue {
30 pub(crate) async fn new(project_id: &str) -> Result<Self> {
31 let mut client = match Self::client().await {
33 Ok(client) => client,
34 Err(_) => {
35 Pueued::daemonize().await?;
36 Self::wait_for_daemon(Duration::from_secs(10), 0).await?;
37 Self::client().await?
38 }
39 };
40 let group = Self::init_or_get_group(&mut client, project_id).await?;
41 Ok(Self {
42 group,
43 client: Mutex::new(client),
44 })
45 }
46
47 pub fn group(&self) -> &str {
48 &self.group
49 }
50
51 pub(crate) async fn client() -> Result<Client> {
52 let (settings, _) = Settings::read(&None)?;
53 let client = Client::new(settings, true)
54 .await
55 .map_err(|e| InnerError::Pueue(pueue_lib::Error::Generic(e.to_string())))?;
56 Ok(client)
57 }
58
59 pub(crate) async fn start(
60 &self,
61 process_name: String,
62 command: String,
63 path: PathBuf,
64 envs: HashMap<String, String>,
65 ) -> Result<usize> {
66 if let Some(process) = self.processes().await?.get(&process_name) {
67 self.remove(process.0).await?;
68 }
69 let mut client = self.client.lock().await;
70 client
71 .send_request(Request::Add(AddRequest {
72 command,
73 path,
74 envs,
75 group: self.group.clone(),
76 label: Some(process_name.clone()),
77 ..Default::default()
78 }))
79 .await?;
80 let rsp = client.receive_response().await?;
81 let task_id = match rsp {
82 Response::AddedTask(task) => task.task_id,
83 e => {
84 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
85 format!("{:?}", e),
86 ))))
87 }
88 };
89 drop(client);
90 while !matches!(
91 self.process_status(&task_id).await?,
92 Some(TaskStatus::Running { .. })
93 ) {
94 sleep(Duration::from_millis(100)).await;
95 }
96 Ok(task_id)
97 }
98
99 pub(crate) async fn processes(&self) -> Result<HashMap<String, (usize, TaskStatus)>> {
100 Ok(self
101 .processes_by_pid()
102 .await?
103 .into_iter()
104 .map(|entry| {
105 (
106 entry.1.label.clone().unwrap_or("NONE".to_string()),
107 (entry.1.id, entry.1.status.clone()),
108 )
109 })
110 .collect())
111 }
112
113 async fn processes_by_pid(&self) -> Result<HashMap<usize, Task>> {
114 let mut client = self.client.lock().await;
115 client.send_request(Request::Status).await?;
116 let rsp = client.receive_response().await?;
117 match rsp {
118 Response::Status(state) => {
119 let task_ids = state.task_ids_in_group(&self.group);
120 let tasks = state
121 .tasks
122 .into_iter()
123 .filter(|entry| task_ids.contains(&entry.0))
124 .map(|entry| (entry.1.id, entry.1))
125 .collect();
126 Ok(tasks)
127 }
128 e => Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
129 format!("{:?}", e),
130 )))),
131 }
132 }
133
134 async fn process_status(&self, pid: &usize) -> Result<Option<TaskStatus>> {
135 Ok(self
136 .processes_by_pid()
137 .await?
138 .get(pid)
139 .map(|p| p.status.clone()))
140 }
141
142 pub(crate) async fn logs(
143 &self,
144 log_tx: Sender<String>,
145 process_prefix: &str,
146 pid: usize,
147 lines: Option<usize>,
148 follow: bool,
149 ) -> Result<()> {
150 match follow {
151 true => self.follow(log_tx, process_prefix, pid, lines).await,
152 false => self.log(log_tx, process_prefix, pid, lines).await,
153 }
154 }
155
156 async fn log(
157 &self,
158 log_tx: Sender<String>,
159 process_prefix: &str,
160 pid: usize,
161 lines: Option<usize>,
162 ) -> Result<()> {
163 let mut client = self.client.lock().await;
164
165 client
166 .send_request(LogRequest {
167 tasks: TaskSelection::TaskIds(vec![pid]),
168 lines,
169 send_logs: true,
170 })
171 .await?;
172 let response = client.receive_response().await?;
173 match response {
174 Response::Log(response) => {
175 for (_, text) in response {
176 let bytes = text.output.clone().unwrap_or_default();
177 let mut decompressor = FrameDecoder::new(bytes.as_slice());
178 let mut buf = vec![];
179 std::io::copy(&mut decompressor, &mut buf).unwrap();
180 let content = String::from_utf8(buf)?;
181 for line in content.lines() {
182 log_tx
183 .send(format!("{process_prefix}{}", line))
184 .await
185 .unwrap();
186 }
187 }
188 }
189 other => {
190 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
191 format!("Received unhandled Response message during logs streaming: {other:?}"),
192 ))))
193 }
194 }
195 Ok(())
196 }
197
198 async fn follow(
199 &self,
200 log_tx: Sender<String>,
201 process_prefix: &str,
202 pid: usize,
203 lines: Option<usize>,
204 ) -> Result<()> {
205 let mut client = Self::client().await?;
207 client
208 .send_request(StreamRequest {
209 tasks: TaskSelection::TaskIds(vec![pid]),
210 lines,
211 })
212 .await?;
213
214 loop {
215 let response = client.receive_response().await?;
216 match response {
217 Response::Stream(response) => {
218 for (_, text) in response.logs {
219 for line in text.lines() {
220 log_tx
221 .send(format!("{process_prefix}{}", line))
222 .await
223 .unwrap();
224 }
225 }
226 }
227 Response::Close => break,
228 Response::Failure(text) => {
229 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
230 format!("Failure during logs streaming: {text}"),
231 ))))
232 }
233 other => {
234 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
235 format!(
236 "Received unhandled Response message during logs streaming: {other:?}"
237 ),
238 ))))
239 }
240 }
241 }
242 Ok(())
243 }
244
245 pub(crate) async fn stop(&self, pid: usize, kill: bool) -> Result<()> {
246 let signal = Some(if kill {
247 Signal::SigKill
248 } else {
249 Signal::SigTerm
250 });
251 let mut client = self.client.lock().await;
252 client
253 .send_request(Request::Kill(KillRequest {
254 tasks: TaskSelection::TaskIds(vec![pid]),
255 signal,
256 }))
257 .await?;
258 let rsp = client.receive_response().await?;
259 if !rsp.success() {
260 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
261 format!("{:?}", rsp),
262 ))));
263 }
264 drop(client);
265 while !matches!(
266 self.process_status(&pid).await?,
267 Some(TaskStatus::Done { .. })
268 ) {
269 sleep(Duration::from_millis(100)).await;
270 }
271 Ok(())
272 }
273
274 async fn remove(&self, pid: usize) -> Result<()> {
275 let mut client = self.client.lock().await;
276 client.send_request(Request::Remove(vec![pid])).await?;
277 let rsp = client.receive_response().await?;
278 if !rsp.success() {
279 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
280 format!("{:?}", rsp),
281 ))));
282 }
283 drop(client);
284 while self.process_status(&pid).await?.is_some() {
285 sleep(Duration::from_millis(100)).await;
286 }
287 Ok(())
288 }
289
290 pub(crate) async fn clean(self) -> Result<()> {
291 self.reset_group(&self.group).await?;
292 self.remove_group(&self.group).await
293 }
294
295 async fn init_or_get_group(client: &mut Client, project_id: &str) -> Result<String> {
296 let group = format!("jocker-{project_id}");
297 if !groups(client).await?.contains_key(&group) {
298 add_group(client, &group).await?;
299 }
300 Ok(group)
301 }
302
303 async fn reset_group(&self, group: &str) -> Result<()> {
304 let mut client = self.client.lock().await;
305 client
306 .send_request(ResetRequest {
307 target: ResetTarget::Groups(vec![group.to_owned()]),
308 })
309 .await?;
310 let response = client.receive_response().await?;
311 if !response.success() {
312 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
313 format!("{:?}", response),
314 ))));
315 }
316 drop(client);
317 while !self.processes().await?.is_empty() {
318 sleep(Duration::from_millis(100)).await;
319 }
321 Ok(())
322 }
323
324 async fn remove_group(&self, group: &str) -> Result<()> {
325 let mut client = self.client.lock().await;
326 client
327 .send_request(GroupRequest::Remove(group.to_owned()))
328 .await?;
329 let response = client.receive_response().await?;
330 if !response.success() {
331 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
332 format!("{:?}", response),
333 ))));
334 }
335 Ok(())
336 }
337
338 async fn wait_for_daemon(timeout: Duration, count: u32) -> Result<()> {
339 let duration = Duration::from_millis(100) * (2 ^ count);
340 if duration > timeout {
341 Self::client().await?;
342 }
343 let rsp = Self::client().await;
344 if rsp.is_err() {
345 if duration.as_secs() > 1 {
346 println!(
347 "pueue daemon unreachable, waiting for {} seconds",
348 duration.as_secs(),
349 );
350 }
351 sleep(duration).await;
352 Box::pin(Self::wait_for_daemon(timeout, count + 1)).await?;
353 }
354 Ok(())
355 }
356}
357
358pub(crate) struct Pueued;
359
360impl Pueued {
361 pub async fn daemonize() -> Result<Child> {
363 let mut build = Command::new("pueued");
364 build.stdout(Stdio::piped()).stderr(Stdio::piped());
365 build.arg("-d");
366 let build = build
367 .spawn()
368 .map_err(Error::with_context(InnerError::Pueue(
369 pueue_lib::Error::Generic("Unable to start `pueued -d` command".to_string()),
370 )))?;
371 Ok(build)
372 }
373}
374
375async fn groups(client: &mut Client) -> Result<BTreeMap<String, Group>> {
378 client
379 .send_request(Request::Group(GroupRequest::List))
380 .await?;
381 match client.receive_response().await? {
382 Response::Group(rsp) => Ok(rsp.groups),
383 _ => unreachable!(),
384 }
385}
386
387async fn add_group(client: &mut Client, group: &str) -> Result<()> {
388 client
389 .send_request(Request::Group(GroupRequest::Add {
390 name: group.to_string(),
391 parallel_tasks: Some(0), }))
393 .await?;
394 let response = client.receive_response().await?;
395 if !response.success() {
396 return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
397 format!("{:?}", response),
398 ))));
399 }
400 Ok(())
401}
402
403#[cfg(test)]
404mod tests {
405 use chrono::Utc;
406
407 use super::*;
408
409 #[tokio::test]
410 async fn group_init() {
411 let project_id = format!("pueue-test-{}", Utc::now().timestamp_millis());
412
413 let p = Pueue::new(&project_id).await.unwrap(); let group_name = p.group;
415 let mut client = Pueue::client().await.unwrap();
416 let grps = groups(&mut client).await.unwrap();
417 assert!(grps.contains_key(&group_name));
418 drop(client);
419
420 let p = Pueue::new(&project_id).await.unwrap(); let group_name = p.group.clone();
422 let mut client = Pueue::client().await.unwrap();
423 let grps = groups(&mut client).await.unwrap();
424 assert!(grps.contains_key(&group_name));
425 drop(client);
426
427 p.clean().await.unwrap();
428 let mut client = Pueue::client().await.unwrap();
429 let grps = groups(&mut client).await.unwrap();
430 assert!(!grps.contains_key(&group_name));
431 drop(client);
432 }
433}