1use crate::common::now;
2use crate::error::Error;
3use serde::{Deserialize, Serialize};
4use std::fmt::Write as _;
5
6#[derive(Serialize, Deserialize)]
7pub struct Info {
8 pub id: i64,
9 pub job_name: String,
10 pub user: String,
11 pub status: Status,
12 pub created: i64,
13 pub finished: Option<i64>,
14 pub elapsed: i64,
15 #[serde(skip_serializing_if = "Option::is_none")]
16 pub log: Option<String>,
17}
18
19impl Info {
20 #[allow(dead_code)]
21 pub(crate) fn update_elapsed(&mut self) {
22 if let Some(t) = self.finished {
23 self.elapsed = t - self.created;
24 } else {
25 self.elapsed = now() - self.created;
26 }
27 }
28}
29
30#[derive(Deserialize, Serialize)]
31pub struct InfoBrief {
32 pub status: Status,
33}
34
35#[derive(
36 Serialize, Deserialize, Copy, Clone, Default, Debug, Eq, PartialEq, bmart::tools::EnumStr,
37)]
38#[repr(i16)]
39#[serde(rename_all = "lowercase")]
40#[enumstr(rename_all = "UPPERCASE")]
41pub enum Status {
42 #[default]
43 Preparing = 0,
44 Queued = 1,
45 Fetching = 2,
46 Building = 10,
47 Testing = 20,
48 Releasing = 30,
49 Completed = 100,
50 Canceled = -1,
51 Terminated = -10,
52 Failed = -100,
53 Unknown = -500,
54}
55
56impl From<i16> for Status {
57 fn from(n: i16) -> Self {
58 match n {
59 v if v == Self::Preparing as i16 => Self::Preparing,
60 v if v == Self::Queued as i16 => Self::Queued,
61 v if v == Self::Fetching as i16 => Self::Fetching,
62 v if v == Self::Building as i16 => Self::Building,
63 v if v == Self::Testing as i16 => Self::Testing,
64 v if v == Self::Releasing as i16 => Self::Releasing,
65 v if v == Self::Completed as i16 => Self::Completed,
66 v if v == Self::Canceled as i16 => Self::Canceled,
67 v if v == Self::Terminated as i16 => Self::Terminated,
68 v if v == Self::Failed as i16 => Self::Failed,
69 _ => Self::Unknown,
70 }
71 }
72}
73
74impl Status {
75 #[allow(dead_code)]
76 pub fn is_finished(self) -> bool {
77 matches!(
78 self,
79 Status::Completed | Status::Canceled | Status::Terminated | Status::Failed
80 )
81 }
82}
83
84#[derive(Serialize, Deserialize, Default, Copy, Clone, bmart::tools::EnumStr)]
85#[serde(rename_all = "lowercase")]
86pub enum FilterStatus {
87 #[default]
88 All,
89 Active,
90 Finished,
91}
92
93#[derive(Serialize, Deserialize)]
94pub struct Filter {
95 pub t_start: Option<i64>,
96 pub t_end: Option<i64>,
97 #[serde(default)]
98 pub status: FilterStatus,
99}
100
101impl Filter {
102 #[allow(dead_code)]
103 pub fn to_sql_cond(&self) -> Result<String, Error> {
104 let mut cond = String::new();
105 write!(
106 cond,
107 " AND created>={}",
108 self.t_start.unwrap_or(now() - 86400)
109 )?;
110 if let Some(t) = self.t_end {
111 write!(cond, " AND created<={}", t)?;
112 }
113 match self.status {
114 FilterStatus::All => {}
115 FilterStatus::Active => {
116 write!(cond, " AND finished IS NULL")?;
117 }
118 FilterStatus::Finished => {
119 write!(cond, " AND finished IS NOT NULL")?;
120 }
121 }
122 Ok(cond)
123 }
124}
125
126#[cfg(feature = "ci")]
127pub mod ci {
128 use super::{Info, Status};
129 use crate::ci::{command, db, processor};
130 use crate::common::internal::{log_dir, task_pool, timeout_fail_notify, work_dir};
131 use crate::common::tki;
132 use crate::error::Error;
133 use crate::job::Job;
134 use log::error;
135 use std::fmt;
136 use std::future::Future;
137 use std::path::{Path, PathBuf};
138 use std::sync::atomic;
139 use std::sync::Arc;
140 use std::time::Duration;
141 use tokio::fs;
142 use tokio::io::AsyncWriteExt;
143 use tokio::process::Child;
144 use tokio::task::JoinHandle;
145
146 #[inline]
147 pub fn list(
148 filter: &super::Filter,
149 ) -> impl Future<Output = Result<Vec<super::Info>, Error>> + '_ {
150 db::list_tasks(filter)
151 }
152
153 #[inline]
154 pub fn get_info(id: i64) -> impl Future<Output = Result<Option<super::Info>, Error>> {
155 db::get_task(id)
156 }
157
158 #[inline]
159 pub fn get_status(id: i64) -> impl Future<Output = Result<Option<super::Status>, Error>> {
160 db::get_task_status(id)
161 }
162
163 async fn msg(log_file: &Path, message: impl fmt::Display) {
164 if let Ok(mut lg) = tokio::fs::OpenOptions::new()
165 .create(true)
166 .truncate(false)
167 .append(true)
168 .write(true)
169 .open(log_file)
170 .await
171 {
172 let _ = lg.write_all(&[b'=', b'=', b'=', b' ']).await;
173 let _ = lg.write_all(message.to_string().as_bytes()).await;
174 let _ = lg.write_all(&[b' ', b'=', b'=', b'=']).await;
175 let _ = lg.write_all(&[b'\n']).await;
176 }
177 }
178
179 pub fn log_file(job_name: &str, task_id: i64) -> PathBuf {
180 let mut lg = log_dir().to_owned();
181 lg.push(format!("{}_{}.log", job_name, task_id));
182 lg
183 }
184
185 pub struct Task {
186 id: i64,
187 job_name: String,
188 user: User,
189 job: Job,
190 pid: parking_lot::Mutex<Option<u32>>,
191 run_fut: parking_lot::Mutex<Option<JoinHandle<()>>>,
192 timeout_fut: parking_lot::Mutex<Option<JoinHandle<()>>>,
193 started: atomic::AtomicBool,
194 status: parking_lot::Mutex<Status>,
195 }
196
197 impl Task {
198 fn sh(&self, log_file: &Path) -> Result<Child, Error> {
199 let status = *self.status.lock();
200 let sh = command::spawn_sh(self.id, status, &self.job_name, log_file)?;
201 self.pid.lock().replace(sh.id().unwrap());
202 Ok(sh)
203 }
204 async fn process(&self, input: String, log_file: &Path) -> Result<(), Error> {
205 let sh = match self.sh(log_file) {
206 Ok(v) => v,
207 Err(e) => {
208 error!("{}: unable to spawn sh {}", self, e);
209 self.mark_failed(log_file).await;
210 return Err(e);
211 }
212 };
213 if let Err(e) = command::process(sh, log_file, Some(input), true).await {
214 error!("{}: {}", self, e);
215 self.mark_failed(log_file).await;
216 return Err(e);
217 };
218 Ok(())
219 }
220 fn process_stage<'a>(
221 &'a self,
222 work_dir: &str,
223 input: &str,
224 log_file: &'a Path,
225 ) -> impl Future<Output = Result<(), Error>> + 'a {
226 let input_cd = format!("cd \"{}\"\n{}", work_dir, input);
227 self.process(input_cd, log_file)
228 }
229 async fn mark_failed(&self, log_file: &Path) {
230 if !(*self.status.lock()).is_finished() {
231 self.set_status(Status::Failed).await;
232 msg(log_file, "TASK FAILED").await;
233 let pid = self.pid.lock().take();
234 if let Some(pid) = pid {
235 bmart::process::kill_pstree(pid, Some(tki()), true).await;
236 }
237 self.run_fut.lock().take();
238 if let Some(fut) = self.timeout_fut.lock().take() {
239 fut.abort();
240 }
241 if let Some(ref fail) = self.job.on.fail {
242 if let Ok(sh) = self.sh(log_file) {
243 let _ = command::process(sh, log_file, Some(fail.clone()), true).await;
244 }
245 }
246 }
247 let _ = processor::mark_finished(self.job_name.clone(), self.id).await;
248 }
249 pub async fn info(task_id: i64, full: bool) -> Result<Option<Info>, Error> {
250 if let Some(mut info) = db::get_task(task_id).await? {
251 if full {
252 let lg = log_file(&info.job_name, info.id);
253 if let Ok(log) = tokio::fs::read_to_string(lg).await {
254 info.log.replace(log);
255 }
256 }
257 Ok(Some(info))
258 } else {
259 Ok(None)
260 }
261 }
262 pub async fn delete(task_id: i64) -> Result<(), Error> {
263 if let Some(info) = db::get_task(task_id).await? {
264 if info.status.is_finished() {
265 db::delete_task(task_id).await?;
266 let _ = tokio::fs::remove_file(log_file(&info.job_name, task_id)).await;
267 Ok(())
268 } else {
269 Err(Error::busy("task is not completed"))
270 }
271 } else {
272 Err(Error::not_found("no such task"))
273 }
274 }
275 pub async fn create(job_name: String, user: User, job: Job) -> Result<Self, Error> {
276 Ok(Self {
277 id: db::create_task(&job_name, &user).await?,
278 job_name,
279 user,
280 job,
281 pid: <_>::default(),
282 run_fut: <_>::default(),
283 timeout_fut: <_>::default(),
284 started: <_>::default(),
285 status: <_>::default(),
286 })
287 }
288 async fn set_status(&self, status: Status) {
289 *self.status.lock() = status;
290 db::set_task_status(self.id, status).await.unwrap();
291 }
292 async fn run_task(self: Arc<Self>) -> Result<(), Error> {
293 self.set_status(Status::Queued).await;
294 let _locker = task_pool().get().await;
295 self.started.store(true, atomic::Ordering::SeqCst);
296 let lg = log_file(&self.job_name, self.id);
297 let mut task_work_dir = work_dir().to_owned();
298 task_work_dir.push(&self.job_name);
299 let wdir = task_work_dir.to_string_lossy();
300 macro_rules! process_stage {
301 ($input: expr) => {
302 self.process_stage(&wdir, $input, &lg).await?;
303 };
304 }
305 macro_rules! msg {
306 ($message: expr) => {
307 msg(&lg, $message).await;
308 };
309 }
310 let task = self.clone();
311 let log_file_c = lg.clone();
312 self.timeout_fut.lock().replace(tokio::spawn(async move {
313 tokio::time::sleep(Duration::from_secs(task.job.timeout)).await;
314 msg(&log_file_c, "TIMED OUT, TERMINATING").await;
315 error!("task {} timed out, terminating", task);
316 let _ = tokio::time::timeout(timeout_fail_notify(), async {
317 if let Some(ref fail) = task.job.on.fail {
318 if let Ok(sh) = command::spawn_sh(
319 task.id,
320 Status::Terminated,
321 &task.job_name,
322 &log_file_c,
323 ) {
324 let _ =
325 command::process(sh, &log_file_c, Some(fail.clone()), true).await;
326 }
327 }
328 })
329 .await;
330 let _ = processor::terminate(task.job_name.clone(), Some(task.id)).await;
331 }));
332 if let Some(ref url) = self.job.git.url {
333 self.set_status(Status::Fetching).await;
335 let branch = &self.job.git.branch;
336 msg!(format!("Fetching {url} branch {branch}"));
337 let input = if task_work_dir.exists() {
338 format!(
339 r#"
340cd "{wdir}"
341git remote set-url origin "{url}"
342git pull origin "{branch}"
343git checkout "{branch}"
344"#,
345 )
346 } else {
347 format!(
348 r#"
349git clone "{url}" "{wdir}"
350cd "{wdir}"
351git checkout "{branch}"
352"#
353 )
354 };
355 self.process(input, &lg).await?;
356 } else {
357 fs::create_dir_all(&task_work_dir).await?;
358 }
359 if let Some(ref build) = self.job.commands.build {
360 self.set_status(Status::Building).await;
361 msg!("Building");
362 process_stage!(build);
363 }
364 if let Some(ref test) = self.job.commands.test {
365 self.set_status(Status::Testing).await;
366 msg!("Testing");
367 process_stage!(test);
368 }
369 if let Some(ref release) = self.job.commands.release {
370 self.set_status(Status::Releasing).await;
371 msg!("Releasing");
372 process_stage!(release);
373 }
374 self.run_fut.lock().take();
375 self.set_status(Status::Completed).await;
376 msg!("TASK COMPLETED");
377 if let Some(ref success) = self.job.on.success {
378 process_stage!(success);
379 }
380 if let Some(fut) = self.timeout_fut.lock().take() {
381 fut.abort();
382 }
383 let _ = processor::mark_finished(self.job_name.clone(), self.id).await;
384 Ok(())
385 }
386 #[allow(clippy::too_many_lines)]
387 pub async fn run(self: Arc<Self>) {
388 let task = self.clone();
389 let fut = tokio::spawn(async move {
390 let _ = task.run_task().await;
391 });
392 self.run_fut.lock().replace(fut);
393 }
394 pub fn terminate(&self) {
398 if let Some(fut) = self.timeout_fut.lock().take() {
399 fut.abort();
400 }
401 if let Some(fut) = self.run_fut.lock().take() {
402 fut.abort();
403 let id = self.id;
404 *self.status.lock() = Status::Terminated;
405 tokio::spawn(async move {
406 db::set_task_status(id, Status::Terminated).await.unwrap();
407 });
408 } else if !self.started.load(atomic::Ordering::SeqCst) {
409 let id = self.id;
410 *self.status.lock() = Status::Canceled;
411 tokio::spawn(async move {
412 db::set_task_status(id, Status::Canceled).await.unwrap();
413 });
414 }
415 if let Some(pid) = self.pid.lock().take() {
416 tokio::spawn(async move {
417 bmart::process::kill_pstree(pid, Some(tki()), true).await;
418 });
419 }
420 }
421 pub async fn terminate_wait(&self) {
425 if let Some(fut) = self.timeout_fut.lock().take() {
426 fut.abort();
427 }
428 let run_fut = self.run_fut.lock().take();
429 if let Some(fut) = run_fut {
430 fut.abort();
431 let id = self.id;
432 *self.status.lock() = Status::Terminated;
433 db::set_task_status(id, Status::Terminated).await.unwrap();
434 } else if !self.started.load(atomic::Ordering::SeqCst) {
435 let id = self.id;
436 *self.status.lock() = Status::Canceled;
437 db::set_task_status(id, Status::Canceled).await.unwrap();
438 }
439 if let Some(pid) = self.pid.lock().take() {
440 tokio::spawn(async move {
441 bmart::process::kill_pstree(pid, Some(tki()), true).await;
442 });
443 }
444 }
445 pub fn id(&self) -> i64 {
446 self.id
447 }
448 pub fn job_name(&self) -> &str {
449 &self.job_name
450 }
451 }
452
453 impl Drop for Task {
454 fn drop(&mut self) {
455 self.terminate();
456 }
457 }
458
459 impl fmt::Display for Task {
460 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461 write!(f, "{} (job {}, user {})", self.id, self.job_name, self.user)
462 }
463 }
464
465 pub enum User {
466 Authenticated(String),
467 Triggered(Trigger),
468 }
469
470 impl User {
471 pub fn admin() -> Self {
472 Self::authenticated("admin")
473 }
474 pub fn authenticated(user: &str) -> Self {
475 Self::Authenticated(user.to_owned())
476 }
477 pub fn triggered(source: &str, user: &str) -> Self {
478 Self::Triggered(Trigger {
479 source: source.to_owned(),
480 user: user.to_owned(),
481 })
482 }
483 }
484
485 impl fmt::Display for User {
486 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
487 match self {
488 User::Authenticated(user) => write!(f, "{}", user),
489 User::Triggered(t) => write!(f, "!trig.{}.{}", t.source, t.user),
490 }
491 }
492 }
493
494 pub struct Trigger {
495 source: String,
496 user: String,
497 }
498}