rci/
task.rs

1use crate::common::now;
2use crate::error::Error;
3use serde::{Deserialize, Serialize};
4use std::fmt::Write as _;
5
6#[derive(Serialize, Deserialize)]
7pub struct Info {
8    pub id: i64,
9    pub job_name: String,
10    pub user: String,
11    pub status: Status,
12    pub created: i64,
13    pub finished: Option<i64>,
14    pub elapsed: i64,
15    #[serde(skip_serializing_if = "Option::is_none")]
16    pub log: Option<String>,
17}
18
19impl Info {
20    #[allow(dead_code)]
21    pub(crate) fn update_elapsed(&mut self) {
22        if let Some(t) = self.finished {
23            self.elapsed = t - self.created;
24        } else {
25            self.elapsed = now() - self.created;
26        }
27    }
28}
29
30#[derive(Deserialize, Serialize)]
31pub struct InfoBrief {
32    pub status: Status,
33}
34
35#[derive(
36    Serialize, Deserialize, Copy, Clone, Default, Debug, Eq, PartialEq, bmart::tools::EnumStr,
37)]
38#[repr(i16)]
39#[serde(rename_all = "lowercase")]
40#[enumstr(rename_all = "UPPERCASE")]
41pub enum Status {
42    #[default]
43    Preparing = 0,
44    Queued = 1,
45    Fetching = 2,
46    Building = 10,
47    Testing = 20,
48    Releasing = 30,
49    Completed = 100,
50    Canceled = -1,
51    Terminated = -10,
52    Failed = -100,
53    Unknown = -500,
54}
55
56impl From<i16> for Status {
57    fn from(n: i16) -> Self {
58        match n {
59            v if v == Self::Preparing as i16 => Self::Preparing,
60            v if v == Self::Queued as i16 => Self::Queued,
61            v if v == Self::Fetching as i16 => Self::Fetching,
62            v if v == Self::Building as i16 => Self::Building,
63            v if v == Self::Testing as i16 => Self::Testing,
64            v if v == Self::Releasing as i16 => Self::Releasing,
65            v if v == Self::Completed as i16 => Self::Completed,
66            v if v == Self::Canceled as i16 => Self::Canceled,
67            v if v == Self::Terminated as i16 => Self::Terminated,
68            v if v == Self::Failed as i16 => Self::Failed,
69            _ => Self::Unknown,
70        }
71    }
72}
73
74impl Status {
75    #[allow(dead_code)]
76    pub fn is_finished(self) -> bool {
77        matches!(
78            self,
79            Status::Completed | Status::Canceled | Status::Terminated | Status::Failed
80        )
81    }
82}
83
84#[derive(Serialize, Deserialize, Default, Copy, Clone, bmart::tools::EnumStr)]
85#[serde(rename_all = "lowercase")]
86pub enum FilterStatus {
87    #[default]
88    All,
89    Active,
90    Finished,
91}
92
93#[derive(Serialize, Deserialize)]
94pub struct Filter {
95    pub t_start: Option<i64>,
96    pub t_end: Option<i64>,
97    #[serde(default)]
98    pub status: FilterStatus,
99}
100
101impl Filter {
102    #[allow(dead_code)]
103    pub fn to_sql_cond(&self) -> Result<String, Error> {
104        let mut cond = String::new();
105        write!(
106            cond,
107            " AND created>={}",
108            self.t_start.unwrap_or(now() - 86400)
109        )?;
110        if let Some(t) = self.t_end {
111            write!(cond, " AND created<={}", t)?;
112        }
113        match self.status {
114            FilterStatus::All => {}
115            FilterStatus::Active => {
116                write!(cond, " AND finished IS NULL")?;
117            }
118            FilterStatus::Finished => {
119                write!(cond, " AND finished IS NOT NULL")?;
120            }
121        }
122        Ok(cond)
123    }
124}
125
126#[cfg(feature = "ci")]
127pub mod ci {
128    use super::{Info, Status};
129    use crate::ci::{command, db, processor};
130    use crate::common::internal::{log_dir, task_pool, timeout_fail_notify, work_dir};
131    use crate::common::tki;
132    use crate::error::Error;
133    use crate::job::Job;
134    use log::error;
135    use std::fmt;
136    use std::future::Future;
137    use std::path::{Path, PathBuf};
138    use std::sync::atomic;
139    use std::sync::Arc;
140    use std::time::Duration;
141    use tokio::fs;
142    use tokio::io::AsyncWriteExt;
143    use tokio::process::Child;
144    use tokio::task::JoinHandle;
145
146    #[inline]
147    pub fn list(
148        filter: &super::Filter,
149    ) -> impl Future<Output = Result<Vec<super::Info>, Error>> + '_ {
150        db::list_tasks(filter)
151    }
152
153    #[inline]
154    pub fn get_info(id: i64) -> impl Future<Output = Result<Option<super::Info>, Error>> {
155        db::get_task(id)
156    }
157
158    #[inline]
159    pub fn get_status(id: i64) -> impl Future<Output = Result<Option<super::Status>, Error>> {
160        db::get_task_status(id)
161    }
162
163    async fn msg(log_file: &Path, message: impl fmt::Display) {
164        if let Ok(mut lg) = tokio::fs::OpenOptions::new()
165            .create(true)
166            .truncate(false)
167            .append(true)
168            .write(true)
169            .open(log_file)
170            .await
171        {
172            let _ = lg.write_all(&[b'=', b'=', b'=', b' ']).await;
173            let _ = lg.write_all(message.to_string().as_bytes()).await;
174            let _ = lg.write_all(&[b' ', b'=', b'=', b'=']).await;
175            let _ = lg.write_all(&[b'\n']).await;
176        }
177    }
178
179    pub fn log_file(job_name: &str, task_id: i64) -> PathBuf {
180        let mut lg = log_dir().to_owned();
181        lg.push(format!("{}_{}.log", job_name, task_id));
182        lg
183    }
184
185    pub struct Task {
186        id: i64,
187        job_name: String,
188        user: User,
189        job: Job,
190        pid: parking_lot::Mutex<Option<u32>>,
191        run_fut: parking_lot::Mutex<Option<JoinHandle<()>>>,
192        timeout_fut: parking_lot::Mutex<Option<JoinHandle<()>>>,
193        started: atomic::AtomicBool,
194        status: parking_lot::Mutex<Status>,
195    }
196
197    impl Task {
198        fn sh(&self, log_file: &Path) -> Result<Child, Error> {
199            let status = *self.status.lock();
200            let sh = command::spawn_sh(self.id, status, &self.job_name, log_file)?;
201            self.pid.lock().replace(sh.id().unwrap());
202            Ok(sh)
203        }
204        async fn process(&self, input: String, log_file: &Path) -> Result<(), Error> {
205            let sh = match self.sh(log_file) {
206                Ok(v) => v,
207                Err(e) => {
208                    error!("{}: unable to spawn sh {}", self, e);
209                    self.mark_failed(log_file).await;
210                    return Err(e);
211                }
212            };
213            if let Err(e) = command::process(sh, log_file, Some(input), true).await {
214                error!("{}: {}", self, e);
215                self.mark_failed(log_file).await;
216                return Err(e);
217            };
218            Ok(())
219        }
220        fn process_stage<'a>(
221            &'a self,
222            work_dir: &str,
223            input: &str,
224            log_file: &'a Path,
225        ) -> impl Future<Output = Result<(), Error>> + 'a {
226            let input_cd = format!("cd \"{}\"\n{}", work_dir, input);
227            self.process(input_cd, log_file)
228        }
229        async fn mark_failed(&self, log_file: &Path) {
230            if !(*self.status.lock()).is_finished() {
231                self.set_status(Status::Failed).await;
232                msg(log_file, "TASK FAILED").await;
233                let pid = self.pid.lock().take();
234                if let Some(pid) = pid {
235                    bmart::process::kill_pstree(pid, Some(tki()), true).await;
236                }
237                self.run_fut.lock().take();
238                if let Some(fut) = self.timeout_fut.lock().take() {
239                    fut.abort();
240                }
241                if let Some(ref fail) = self.job.on.fail {
242                    if let Ok(sh) = self.sh(log_file) {
243                        let _ = command::process(sh, log_file, Some(fail.clone()), true).await;
244                    }
245                }
246            }
247            let _ = processor::mark_finished(self.job_name.clone(), self.id).await;
248        }
249        pub async fn info(task_id: i64, full: bool) -> Result<Option<Info>, Error> {
250            if let Some(mut info) = db::get_task(task_id).await? {
251                if full {
252                    let lg = log_file(&info.job_name, info.id);
253                    if let Ok(log) = tokio::fs::read_to_string(lg).await {
254                        info.log.replace(log);
255                    }
256                }
257                Ok(Some(info))
258            } else {
259                Ok(None)
260            }
261        }
262        pub async fn delete(task_id: i64) -> Result<(), Error> {
263            if let Some(info) = db::get_task(task_id).await? {
264                if info.status.is_finished() {
265                    db::delete_task(task_id).await?;
266                    let _ = tokio::fs::remove_file(log_file(&info.job_name, task_id)).await;
267                    Ok(())
268                } else {
269                    Err(Error::busy("task is not completed"))
270                }
271            } else {
272                Err(Error::not_found("no such task"))
273            }
274        }
275        pub async fn create(job_name: String, user: User, job: Job) -> Result<Self, Error> {
276            Ok(Self {
277                id: db::create_task(&job_name, &user).await?,
278                job_name,
279                user,
280                job,
281                pid: <_>::default(),
282                run_fut: <_>::default(),
283                timeout_fut: <_>::default(),
284                started: <_>::default(),
285                status: <_>::default(),
286            })
287        }
288        async fn set_status(&self, status: Status) {
289            *self.status.lock() = status;
290            db::set_task_status(self.id, status).await.unwrap();
291        }
292        async fn run_task(self: Arc<Self>) -> Result<(), Error> {
293            self.set_status(Status::Queued).await;
294            let _locker = task_pool().get().await;
295            self.started.store(true, atomic::Ordering::SeqCst);
296            let lg = log_file(&self.job_name, self.id);
297            let mut task_work_dir = work_dir().to_owned();
298            task_work_dir.push(&self.job_name);
299            let wdir = task_work_dir.to_string_lossy();
300            macro_rules! process_stage {
301                ($input: expr) => {
302                    self.process_stage(&wdir, $input, &lg).await?;
303                };
304            }
305            macro_rules! msg {
306                ($message: expr) => {
307                    msg(&lg, $message).await;
308                };
309            }
310            let task = self.clone();
311            let log_file_c = lg.clone();
312            self.timeout_fut.lock().replace(tokio::spawn(async move {
313                tokio::time::sleep(Duration::from_secs(task.job.timeout)).await;
314                msg(&log_file_c, "TIMED OUT, TERMINATING").await;
315                error!("task {} timed out, terminating", task);
316                let _ = tokio::time::timeout(timeout_fail_notify(), async {
317                    if let Some(ref fail) = task.job.on.fail {
318                        if let Ok(sh) = command::spawn_sh(
319                            task.id,
320                            Status::Terminated,
321                            &task.job_name,
322                            &log_file_c,
323                        ) {
324                            let _ =
325                                command::process(sh, &log_file_c, Some(fail.clone()), true).await;
326                        }
327                    }
328                })
329                .await;
330                let _ = processor::terminate(task.job_name.clone(), Some(task.id)).await;
331            }));
332            if let Some(ref url) = self.job.git.url {
333                // FETCH
334                self.set_status(Status::Fetching).await;
335                let branch = &self.job.git.branch;
336                msg!(format!("Fetching {url} branch {branch}"));
337                let input = if task_work_dir.exists() {
338                    format!(
339                        r#"
340cd "{wdir}"
341git remote set-url origin "{url}"
342git pull origin "{branch}"
343git checkout "{branch}"
344"#,
345                    )
346                } else {
347                    format!(
348                        r#"
349git clone "{url}" "{wdir}"
350cd "{wdir}"
351git checkout "{branch}"
352"#
353                    )
354                };
355                self.process(input, &lg).await?;
356            } else {
357                fs::create_dir_all(&task_work_dir).await?;
358            }
359            if let Some(ref build) = self.job.commands.build {
360                self.set_status(Status::Building).await;
361                msg!("Building");
362                process_stage!(build);
363            }
364            if let Some(ref test) = self.job.commands.test {
365                self.set_status(Status::Testing).await;
366                msg!("Testing");
367                process_stage!(test);
368            }
369            if let Some(ref release) = self.job.commands.release {
370                self.set_status(Status::Releasing).await;
371                msg!("Releasing");
372                process_stage!(release);
373            }
374            self.run_fut.lock().take();
375            self.set_status(Status::Completed).await;
376            msg!("TASK COMPLETED");
377            if let Some(ref success) = self.job.on.success {
378                process_stage!(success);
379            }
380            if let Some(fut) = self.timeout_fut.lock().take() {
381                fut.abort();
382            }
383            let _ = processor::mark_finished(self.job_name.clone(), self.id).await;
384            Ok(())
385        }
386        #[allow(clippy::too_many_lines)]
387        pub async fn run(self: Arc<Self>) {
388            let task = self.clone();
389            let fut = tokio::spawn(async move {
390                let _ = task.run_task().await;
391            });
392            self.run_fut.lock().replace(fut);
393        }
394        /// # Panics
395        ///
396        /// Will panic if DB is unavailable
397        pub fn terminate(&self) {
398            if let Some(fut) = self.timeout_fut.lock().take() {
399                fut.abort();
400            }
401            if let Some(fut) = self.run_fut.lock().take() {
402                fut.abort();
403                let id = self.id;
404                *self.status.lock() = Status::Terminated;
405                tokio::spawn(async move {
406                    db::set_task_status(id, Status::Terminated).await.unwrap();
407                });
408            } else if !self.started.load(atomic::Ordering::SeqCst) {
409                let id = self.id;
410                *self.status.lock() = Status::Canceled;
411                tokio::spawn(async move {
412                    db::set_task_status(id, Status::Canceled).await.unwrap();
413                });
414            }
415            if let Some(pid) = self.pid.lock().take() {
416                tokio::spawn(async move {
417                    bmart::process::kill_pstree(pid, Some(tki()), true).await;
418                });
419            }
420        }
421        /// # Panics
422        ///
423        /// Will panic if DB is unavailable
424        pub async fn terminate_wait(&self) {
425            if let Some(fut) = self.timeout_fut.lock().take() {
426                fut.abort();
427            }
428            let run_fut = self.run_fut.lock().take();
429            if let Some(fut) = run_fut {
430                fut.abort();
431                let id = self.id;
432                *self.status.lock() = Status::Terminated;
433                db::set_task_status(id, Status::Terminated).await.unwrap();
434            } else if !self.started.load(atomic::Ordering::SeqCst) {
435                let id = self.id;
436                *self.status.lock() = Status::Canceled;
437                db::set_task_status(id, Status::Canceled).await.unwrap();
438            }
439            if let Some(pid) = self.pid.lock().take() {
440                tokio::spawn(async move {
441                    bmart::process::kill_pstree(pid, Some(tki()), true).await;
442                });
443            }
444        }
445        pub fn id(&self) -> i64 {
446            self.id
447        }
448        pub fn job_name(&self) -> &str {
449            &self.job_name
450        }
451    }
452
453    impl Drop for Task {
454        fn drop(&mut self) {
455            self.terminate();
456        }
457    }
458
459    impl fmt::Display for Task {
460        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461            write!(f, "{} (job {}, user {})", self.id, self.job_name, self.user)
462        }
463    }
464
465    pub enum User {
466        Authenticated(String),
467        Triggered(Trigger),
468    }
469
470    impl User {
471        pub fn admin() -> Self {
472            Self::authenticated("admin")
473        }
474        pub fn authenticated(user: &str) -> Self {
475            Self::Authenticated(user.to_owned())
476        }
477        pub fn triggered(source: &str, user: &str) -> Self {
478            Self::Triggered(Trigger {
479                source: source.to_owned(),
480                user: user.to_owned(),
481            })
482        }
483    }
484
485    impl fmt::Display for User {
486        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487            match self {
488                User::Authenticated(user) => write!(f, "{}", user),
489                User::Triggered(t) => write!(f, "!trig.{}.{}", t.source, t.user),
490            }
491        }
492    }
493
494    pub struct Trigger {
495        source: String,
496        user: String,
497    }
498}