jocker_lib/command/
pueue.rs

1use std::{
2    collections::{BTreeMap, HashMap},
3    path::PathBuf,
4    process::Stdio,
5    time::Duration,
6};
7
8use pueue_lib::{
9    network::message::{
10        AddRequest, GroupRequest, KillRequest, LogRequest, ResetRequest, ResetTarget, Signal,
11        StreamRequest, TaskSelection,
12    },
13    Client, Group, Request, Response, Settings, Task, TaskStatus,
14};
15use snap::read::FrameDecoder;
16use tokio::{
17    process::{Child, Command},
18    sync::{mpsc::Sender, Mutex},
19    time::sleep,
20};
21
22use crate::error::{Error, InnerError, Result};
23
24pub(crate) struct Pueue {
25    group: String,
26    client: Mutex<Client>,
27}
28
29impl Pueue {
30    pub(crate) async fn new(project_id: &str) -> Result<Self> {
31        // Try to start pueued if initial client creation fails
32        let mut client = match Self::client().await {
33            Ok(client) => client,
34            Err(_) => {
35                Pueued::daemonize().await?;
36                Self::wait_for_daemon(Duration::from_secs(10), 0).await?;
37                Self::client().await?
38            }
39        };
40        let group = Self::init_or_get_group(&mut client, project_id).await?;
41        Ok(Self {
42            group,
43            client: Mutex::new(client),
44        })
45    }
46
47    pub fn group(&self) -> &str {
48        &self.group
49    }
50
51    pub(crate) async fn client() -> Result<Client> {
52        let (settings, _) = Settings::read(&None)?;
53        let client = Client::new(settings, true)
54            .await
55            .map_err(|e| InnerError::Pueue(pueue_lib::Error::Generic(e.to_string())))?;
56        Ok(client)
57    }
58
59    pub(crate) async fn start(
60        &self,
61        process_name: String,
62        command: String,
63        path: PathBuf,
64        envs: HashMap<String, String>,
65    ) -> Result<usize> {
66        if let Some(process) = self.processes().await?.get(&process_name) {
67            self.remove(process.0).await?;
68        }
69        let mut client = self.client.lock().await;
70        client
71            .send_request(Request::Add(AddRequest {
72                command,
73                path,
74                envs,
75                group: self.group.clone(),
76                label: Some(process_name.clone()),
77                ..Default::default()
78            }))
79            .await?;
80        let rsp = client.receive_response().await?;
81        let task_id = match rsp {
82            Response::AddedTask(task) => task.task_id,
83            e => {
84                return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
85                    format!("{:?}", e),
86                ))))
87            }
88        };
89        drop(client);
90        while !matches!(
91            self.process_status(&task_id).await?,
92            Some(TaskStatus::Running { .. })
93        ) {
94            sleep(Duration::from_millis(100)).await;
95        }
96        Ok(task_id)
97    }
98
99    pub(crate) async fn processes(&self) -> Result<HashMap<String, (usize, TaskStatus)>> {
100        Ok(self
101            .processes_by_pid()
102            .await?
103            .into_iter()
104            .map(|entry| {
105                (
106                    entry.1.label.clone().unwrap_or("NONE".to_string()),
107                    (entry.1.id, entry.1.status.clone()),
108                )
109            })
110            .collect())
111    }
112
113    async fn processes_by_pid(&self) -> Result<HashMap<usize, Task>> {
114        let mut client = self.client.lock().await;
115        client.send_request(Request::Status).await?;
116        let rsp = client.receive_response().await?;
117        match rsp {
118            Response::Status(state) => {
119                let task_ids = state.task_ids_in_group(&self.group);
120                let tasks = state
121                    .tasks
122                    .into_iter()
123                    .filter(|entry| task_ids.contains(&entry.0))
124                    .map(|entry| (entry.1.id, entry.1))
125                    .collect();
126                Ok(tasks)
127            }
128            e => Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
129                format!("{:?}", e),
130            )))),
131        }
132    }
133
134    async fn process_status(&self, pid: &usize) -> Result<Option<TaskStatus>> {
135        Ok(self
136            .processes_by_pid()
137            .await?
138            .get(pid)
139            .map(|p| p.status.clone()))
140    }
141
142    pub(crate) async fn logs(
143        &self,
144        log_tx: Sender<String>,
145        process_prefix: &str,
146        pid: usize,
147        lines: Option<usize>,
148        follow: bool,
149    ) -> Result<()> {
150        match follow {
151            true => self.follow(log_tx, process_prefix, pid, lines).await,
152            false => self.log(log_tx, process_prefix, pid, lines).await,
153        }
154    }
155
156    async fn log(
157        &self,
158        log_tx: Sender<String>,
159        process_prefix: &str,
160        pid: usize,
161        lines: Option<usize>,
162    ) -> Result<()> {
163        let mut client = self.client.lock().await;
164
165        client
166            .send_request(LogRequest {
167                tasks: TaskSelection::TaskIds(vec![pid]),
168                lines,
169                send_logs: true,
170            })
171            .await?;
172        let response = client.receive_response().await?;
173        match response {
174            Response::Log(response) => {
175                for (_, text) in response {
176                    let bytes = text.output.clone().unwrap_or_default();
177                    let mut decompressor = FrameDecoder::new(bytes.as_slice());
178                    let mut buf = vec![];
179                    std::io::copy(&mut decompressor, &mut buf).unwrap();
180                    let content = String::from_utf8(buf)?;
181                    for line in content.lines() {
182                        log_tx
183                            .send(format!("{process_prefix}{}", line))
184                            .await
185                            .unwrap();
186                    }
187                }
188            }
189            other => {
190                return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
191                    format!("Received unhandled Response message during logs streaming: {other:?}"),
192                ))))
193            }
194        }
195        Ok(())
196    }
197
198    async fn follow(
199        &self,
200        log_tx: Sender<String>,
201        process_prefix: &str,
202        pid: usize,
203        lines: Option<usize>,
204    ) -> Result<()> {
205        // Create its own client to avoid blocking
206        let mut client = Self::client().await?;
207        client
208            .send_request(StreamRequest {
209                tasks: TaskSelection::TaskIds(vec![pid]),
210                lines,
211            })
212            .await?;
213
214        loop {
215            let response = client.receive_response().await?;
216            match response {
217                Response::Stream(response) => {
218                    for (_, text) in response.logs {
219                        for line in text.lines() {
220                            log_tx
221                                .send(format!("{process_prefix}{}", line))
222                                .await
223                                .unwrap();
224                        }
225                    }
226                }
227                Response::Close => break,
228                Response::Failure(text) => {
229                    return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
230                        format!("Failure during logs streaming: {text}"),
231                    ))))
232                }
233                other => {
234                    return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
235                        format!(
236                            "Received unhandled Response message during logs streaming: {other:?}"
237                        ),
238                    ))))
239                }
240            }
241        }
242        Ok(())
243    }
244
245    pub(crate) async fn stop(&self, pid: usize, kill: bool) -> Result<()> {
246        let signal = Some(if kill {
247            Signal::SigKill
248        } else {
249            Signal::SigTerm
250        });
251        let mut client = self.client.lock().await;
252        client
253            .send_request(Request::Kill(KillRequest {
254                tasks: TaskSelection::TaskIds(vec![pid]),
255                signal,
256            }))
257            .await?;
258        let rsp = client.receive_response().await?;
259        if !rsp.success() {
260            return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
261                format!("{:?}", rsp),
262            ))));
263        }
264        drop(client);
265        while !matches!(
266            self.process_status(&pid).await?,
267            Some(TaskStatus::Done { .. })
268        ) {
269            sleep(Duration::from_millis(100)).await;
270        }
271        Ok(())
272    }
273
274    async fn remove(&self, pid: usize) -> Result<()> {
275        let mut client = self.client.lock().await;
276        client.send_request(Request::Remove(vec![pid])).await?;
277        let rsp = client.receive_response().await?;
278        if !rsp.success() {
279            return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
280                format!("{:?}", rsp),
281            ))));
282        }
283        drop(client);
284        while self.process_status(&pid).await?.is_some() {
285            sleep(Duration::from_millis(100)).await;
286        }
287        Ok(())
288    }
289
290    pub(crate) async fn clean(self) -> Result<()> {
291        self.reset_group(&self.group).await?;
292        self.remove_group(&self.group).await
293    }
294
295    async fn init_or_get_group(client: &mut Client, project_id: &str) -> Result<String> {
296        let group = format!("jocker-{project_id}");
297        if !groups(client).await?.contains_key(&group) {
298            add_group(client, &group).await?;
299        }
300        Ok(group)
301    }
302
303    async fn reset_group(&self, group: &str) -> Result<()> {
304        let mut client = self.client.lock().await;
305        client
306            .send_request(ResetRequest {
307                target: ResetTarget::Groups(vec![group.to_owned()]),
308            })
309            .await?;
310        let response = client.receive_response().await?;
311        if !response.success() {
312            return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
313                format!("{:?}", response),
314            ))));
315        }
316        drop(client);
317        while !self.processes().await?.is_empty() {
318            sleep(Duration::from_millis(100)).await;
319            // TODO: Handle timeout
320        }
321        Ok(())
322    }
323
324    async fn remove_group(&self, group: &str) -> Result<()> {
325        let mut client = self.client.lock().await;
326        client
327            .send_request(GroupRequest::Remove(group.to_owned()))
328            .await?;
329        let response = client.receive_response().await?;
330        if !response.success() {
331            return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
332                format!("{:?}", response),
333            ))));
334        }
335        Ok(())
336    }
337
338    async fn wait_for_daemon(timeout: Duration, count: u32) -> Result<()> {
339        let duration = Duration::from_millis(100) * (2 ^ count);
340        if duration > timeout {
341            Self::client().await?;
342        }
343        let rsp = Self::client().await;
344        if rsp.is_err() {
345            if duration.as_secs() > 1 {
346                println!(
347                    "pueue daemon unreachable, waiting for {} seconds",
348                    duration.as_secs(),
349                );
350            }
351            sleep(duration).await;
352            Box::pin(Self::wait_for_daemon(timeout, count + 1)).await?;
353        }
354        Ok(())
355    }
356}
357
358pub(crate) struct Pueued;
359
360impl Pueued {
361    /// Launch `pueued` as a background daemon
362    pub async fn daemonize() -> Result<Child> {
363        let mut build = Command::new("pueued");
364        build.stdout(Stdio::piped()).stderr(Stdio::piped());
365        build.arg("-d");
366        let build = build
367            .spawn()
368            .map_err(Error::with_context(InnerError::Pueue(
369                pueue_lib::Error::Generic("Unable to start `pueued -d` command".to_string()),
370            )))?;
371        Ok(build)
372    }
373}
374
375// Groups
376
377async fn groups(client: &mut Client) -> Result<BTreeMap<String, Group>> {
378    client
379        .send_request(Request::Group(GroupRequest::List))
380        .await?;
381    match client.receive_response().await? {
382        Response::Group(rsp) => Ok(rsp.groups),
383        _ => unreachable!(),
384    }
385}
386
387async fn add_group(client: &mut Client, group: &str) -> Result<()> {
388    client
389        .send_request(Request::Group(GroupRequest::Add {
390            name: group.to_string(),
391            parallel_tasks: Some(0), // Unlimited
392        }))
393        .await?;
394    let response = client.receive_response().await?;
395    if !response.success() {
396        return Err(Error::new(InnerError::Pueue(pueue_lib::Error::Generic(
397            format!("{:?}", response),
398        ))));
399    }
400    Ok(())
401}
402
403#[cfg(test)]
404mod tests {
405    use chrono::Utc;
406
407    use super::*;
408
409    #[tokio::test]
410    async fn group_init() {
411        let project_id = format!("pueue-test-{}", Utc::now().timestamp_millis());
412
413        let p = Pueue::new(&project_id).await.unwrap(); // Group does not exist, create it
414        let group_name = p.group;
415        let mut client = Pueue::client().await.unwrap();
416        let grps = groups(&mut client).await.unwrap();
417        assert!(grps.contains_key(&group_name));
418        drop(client);
419
420        let p = Pueue::new(&project_id).await.unwrap(); // Group already exists
421        let group_name = p.group.clone();
422        let mut client = Pueue::client().await.unwrap();
423        let grps = groups(&mut client).await.unwrap();
424        assert!(grps.contains_key(&group_name));
425        drop(client);
426
427        p.clean().await.unwrap();
428        let mut client = Pueue::client().await.unwrap();
429        let grps = groups(&mut client).await.unwrap();
430        assert!(!grps.contains_key(&group_name));
431        drop(client);
432    }
433}