1use chrono::NaiveDateTime;
2use sqlx::migrate::MigrateError;
3use sqlx::{error::Error as DBError, Pool, Sqlite, SqlitePool};
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, RwLock};
7
8mod db;
9mod event;
10pub mod migration;
11pub use event::*;
12
13const ZERO: i64 = 0;
14const EMPTY_STR: &str = "";
15
16const EXEC_CODE: i8 = EventType::Exec.get_code();
17const EXEC_DONE_CODE: i8 = EventType::ExecDone.get_code();
18
19#[derive(Debug, Clone)]
20pub struct Historian {
21 pool: Arc<RwLock<SqlitePool>>,
22 dir_path: PathBuf,
23}
24
25async fn raw_record_event(pool: &Pool<Sqlite>, event: DBEvent<'_>) -> Result<i64, DBError> {
26 let res = sqlx::query!(
27 "
28 INSERT INTO events
29 (script_id, type, cmd, args, content, time, main_event_id, dir, envs, humble)
30 VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
31 RETURNING id
32 ",
33 event.script_id,
34 event.ty,
35 event.cmd,
36 event.args,
37 event.content,
38 event.time,
39 event.main_event_id,
40 event.dir,
41 event.envs,
42 event.humble
43 )
44 .fetch_one(pool)
45 .await?;
46 Ok(res.id)
47}
48
49#[derive(Clone, Copy)]
50struct DBEvent<'a> {
51 script_id: i64,
52 ty: i8,
53 cmd: &'a str,
54 time: NaiveDateTime,
55 args: Option<&'a str>,
56 dir: Option<&'a str>,
57 envs: Option<&'a str>,
58 content: Option<&'a str>,
59 humble: bool,
60 main_event_id: i64,
61}
62impl<'a> DBEvent<'a> {
63 fn new(script_id: i64, time: NaiveDateTime, ty: i8, cmd: &'a str, humble: bool) -> Self {
64 DBEvent {
65 script_id,
66 time,
67 ty,
68 cmd,
69 humble,
70 main_event_id: ZERO,
71 envs: None,
72 content: None,
73 args: None,
74 dir: None,
75 }
76 }
77 fn args(mut self, value: &'a str) -> Self {
78 self.args = Some(value);
79 self
80 }
81 fn dir(mut self, value: &'a str) -> Self {
82 self.dir = Some(value);
83 self
84 }
85 fn content(mut self, value: &'a str) -> Self {
86 self.content = Some(value);
87 self
88 }
89 fn envs(mut self, value: &'a str) -> Self {
90 self.envs = Some(value);
91 self
92 }
93 fn humble(mut self) -> Self {
94 self.humble = true;
95 self
96 }
97 fn main_event_id(mut self, value: i64) -> Self {
98 self.main_event_id = value;
99 self
100 }
101}
102
103macro_rules! last_arg {
104 ($select:literal, $offset:expr, $limit:expr, $group_by:literal, $where:literal $(+ $more_where:literal)* , $($var:expr),*) => {{
105 sqlx::query!(
106 "
107 WITH args AS (
108 SELECT " + $select + ", max(time) as time FROM events
109 WHERE type = ? AND NOT ignored "
110 +
111 $where
112 $(+ $more_where)*
113 +
114 " GROUP BY script_id " + $group_by + " ORDER BY time DESC LIMIT ? OFFSET ?
115 ) SELECT "
116 + $select
117 + " FROM args
118 ",
119 EXEC_CODE,
120 $($var, )*
121 $limit,
122 $offset,
123 )
124 }};
125}
126macro_rules! do_last_arg {
127 ($select:literal, $group_by:literal, $ids:expr, $limit:expr, $offset:expr, $no_humble:expr, $dir:expr, $historian:expr) => {{
128 let ids = join_id_str($ids);
129 log::info!("查詢歷史 {}", ids);
130 let limit = $limit as i64;
131 let offset = $offset as i64;
132 let no_dir = $dir.is_none();
133 let dir = $dir.map(|p| p.to_string_lossy());
134 let dir = dir.as_deref().unwrap_or(EMPTY_STR);
135 last_arg!(
137 $select,
138 offset,
139 limit,
140 $group_by,
141 "
142 AND instr(?, '[' || script_id || ']') > 0 AND (? OR dir = ?)
143 AND (NOT ? OR NOT humble)
144 ",
145 ids,
146 no_dir,
147 dir,
148 $no_humble
149 )
150 .fetch_all(&*$historian.pool.read().unwrap())
151 .await
152 }};
153}
154
155macro_rules! ignore_or_humble_arg {
156 ($ignore_or_humble:literal, $pool:expr, $cond:literal $(+ $more_cond:literal)*, $($var:expr),+) => {
157 sqlx::query!(
158 "
159 UPDATE events SET " + $ignore_or_humble + " = true
160 WHERE type = ? AND main_event_id IN (
161 SELECT id FROM events WHERE type = ? AND NOT ignored AND "
162 + $cond $(+ $more_cond)*
163 + "
164 )
165 ",
166 EXEC_DONE_CODE,
167 EXEC_CODE,
168 $($var),*
169 )
170 .execute(&*$pool)
171 .await?;
172
173 sqlx::query!(
174 "
175 UPDATE events SET " + $ignore_or_humble + " = true
176 WHERE type = ? AND NOT ignored AND
177 "
178 + $cond $(+ $more_cond)*,
179 EXEC_CODE,
180 $($var),*
181 )
182 .execute(&*$pool)
183 .await?;
184 };
185}
186
187#[derive(Debug)]
188pub struct LastTimeRecord {
189 pub script_id: i64,
190 pub exec_time: Option<NaiveDateTime>,
191 pub exec_done_time: Option<NaiveDateTime>,
192 pub humble_time: Option<NaiveDateTime>,
193}
194
195impl Historian {
196 pub async fn close(self) {
197 log::info!("close the historian database");
198 if let Ok(pool) = self.pool.read() {
199 pool.close().await;
200 }
201 }
202 async fn raw_record(&self, event: DBEvent<'_>) -> Result<i64, DBError> {
203 let pool = &mut *self.pool.write().unwrap();
204 let res = raw_record_event(pool, event).await;
205 if res.is_err() {
206 pool.close().await;
207 log::warn!("資料庫錯誤 {:?},再試最後一次!", res);
208 *pool = db::get_pool(&self.dir_path).await?;
209 return raw_record_event(pool, event).await;
210 }
211
212 res
213 }
214 pub async fn new(dir_path: PathBuf) -> Result<Self, DBError> {
215 db::get_pool(&dir_path).await.map(|pool| Historian {
216 pool: Arc::new(RwLock::new(pool)),
217 dir_path,
218 })
219 }
220 pub async fn do_migrate(dir_path: &Path) -> Result<(), MigrateError> {
221 migration::do_migrate(db::get_file(dir_path)).await?;
222 Ok(())
223 }
224
225 pub async fn remove(&self, script_id: i64) -> Result<(), DBError> {
226 let pool = self.pool.read().unwrap();
227 sqlx::query!("DELETE FROM events WHERE script_id = ?", script_id,)
228 .execute(&*pool)
229 .await?;
230 Ok(())
231 }
232
233 pub async fn record(&self, event: &Event<'_>) -> Result<i64, DBError> {
234 log::debug!("記錄事件 {:?}", event);
235 let ty = event.data.get_type().get_code();
236 let cmd = std::env::args().collect::<Vec<_>>().join(" ");
237 let mut db_event = DBEvent::new(event.script_id, event.time, ty, &cmd, event.humble);
238 let id = match &event.data {
239 EventData::Write | EventData::Read => self.raw_record(db_event).await?,
240 EventData::Exec {
241 content,
242 args,
243 envs,
244 dir,
245 } => {
246 let mut content = Some(*content);
247 let last_event = sqlx::query!(
248 "
249 SELECT content FROM events
250 WHERE type = ? AND script_id = ? AND NOT content IS NULL
251 ORDER BY time DESC LIMIT 1
252 ",
253 ty,
254 event.script_id
255 )
256 .fetch_optional(&*self.pool.read().unwrap())
257 .await?;
258 if let Some(last_event) = last_event {
259 if last_event.content.as_deref() == content {
260 log::debug!("上次執行內容相同,不重複記錄");
261 content = None;
262 }
263 }
264 db_event.content = content;
265 let dir = dir.map(|p| p.to_string_lossy()).unwrap_or_default();
266 self.raw_record(db_event.envs(envs).dir(dir.as_ref()).args(args))
267 .await?
268 }
269 EventData::ExecDone {
270 code,
271 main_event_id,
272 } => {
273 let main_event = sqlx::query!(
274 "SELECT ignored, humble FROM events WHERE type = ? AND id = ?",
275 EXEC_CODE,
276 main_event_id
277 )
278 .fetch_optional(&*self.pool.read().unwrap())
279 .await?;
280 let main_event = match main_event {
281 Some(e) => e,
282 None => {
283 log::warn!("找不到主要事件,可能被 tidy 掉了");
284 return Ok(ZERO);
285 }
286 };
287 if main_event.ignored {
288 return Ok(ZERO);
289 } else if main_event.humble {
290 log::debug!("謙卑地執行完畢了");
291 db_event = db_event.humble();
292 }
293
294 let code = code.to_string();
295 let id = self
296 .raw_record(db_event.content(&code).main_event_id(*main_event_id))
297 .await?;
298
299 if db_event.humble {
300 ZERO
302 } else {
303 id
304 }
305 }
306 };
307 Ok(id)
308 }
309
310 pub async fn previous_args(
311 &self,
312 id: i64,
313 dir: Option<&Path>,
314 ) -> Result<Option<(String, String)>, DBError> {
315 let no_dir = dir.is_none();
316 let dir = dir.map(|p| p.to_string_lossy());
317 let dir = dir.as_deref().unwrap_or(EMPTY_STR);
318 let res = sqlx::query!(
319 "
320 SELECT args, envs FROM events
321 WHERE type = ? AND script_id = ? AND NOT ignored
322 AND (? OR dir = ?)
323 ORDER BY time DESC LIMIT 1
324 ",
325 EXEC_CODE,
326 id,
327 no_dir,
328 dir
329 )
330 .fetch_optional(&*self.pool.read().unwrap())
331 .await?;
332 Ok(res.map(|res| (res.args.unwrap_or_default(), res.envs.unwrap_or_default())))
333 }
334
335 pub async fn previous_args_list(
336 &self,
337 ids: &[i64],
338 limit: u32,
339 offset: u32,
340 no_humble: bool,
341 dir: Option<&Path>,
342 ) -> Result<impl ExactSizeIterator<Item = (i64, String)>, DBError> {
343 let res = do_last_arg!(
344 "script_id, args",
345 ", args",
346 ids,
347 limit,
348 offset,
349 no_humble,
350 dir,
351 self
352 )?;
353 Ok(res.into_iter().map(|res| {
354 (
355 res.script_id.unwrap_or_default(),
356 res.args.unwrap_or_default(),
357 )
358 }))
359 }
360
361 pub async fn previous_args_list_with_envs(
362 &self,
363 ids: &[i64],
364 limit: u32,
365 offset: u32,
366 no_humble: bool,
367 dir: Option<&Path>,
368 ) -> Result<impl ExactSizeIterator<Item = (i64, String, String)>, DBError> {
369 let res = do_last_arg!(
370 "script_id, args, envs",
371 ", args, envs",
372 ids,
373 limit,
374 offset,
375 no_humble,
376 dir,
377 self
378 )?;
379 Ok(res.into_iter().map(|res| {
380 (
381 res.script_id.unwrap_or_default(),
382 res.args.unwrap_or_default(),
383 res.envs.unwrap_or_default(),
384 )
385 }))
386 }
387
388 pub async fn previous_args_list_only_envs(
389 &self,
390 ids: &[i64],
391 limit: u32,
392 offset: u32,
393 no_humble: bool,
394 dir: Option<&Path>,
395 ) -> Result<impl ExactSizeIterator<Item = (i64, String)>, DBError> {
396 let res = do_last_arg!(
397 "script_id, envs",
398 ", envs",
399 ids,
400 limit,
401 offset,
402 no_humble,
403 dir,
404 self
405 )?;
406 Ok(res.into_iter().map(|res| {
407 (
408 res.script_id.unwrap_or_default(),
409 res.envs.unwrap_or_default(),
410 )
411 }))
412 }
413 async fn make_last_time_record(&self, script_id: i64) -> Result<LastTimeRecord, DBError> {
414 let res = sqlx::query_as_unchecked!(
415 LastTimeRecord,
416 "
417 SELECT
418 ? as script_id,
419 (SELECT time FROM events
420 WHERE script_id = ? AND NOT ignored AND humble
421 ORDER BY time DESC LIMIT 1) as humble_time,
422 (SELECT time FROM events
423 WHERE script_id = ? AND NOT ignored AND NOT humble AND type = ?
424 ORDER BY time DESC LIMIT 1) as exec_time,
425 (SELECT time FROM events
426 WHERE script_id = ? AND NOT ignored AND NOT humble AND type = ?
427 ORDER BY time DESC LIMIT 1) as exec_done_time
428 ",
429 script_id,
430 script_id,
431 script_id,
432 EXEC_CODE,
433 script_id,
434 EXEC_DONE_CODE
435 )
436 .fetch_one(&*self.pool.read().unwrap())
437 .await?;
438
439 Ok(LastTimeRecord {
440 script_id,
441 exec_time: res.exec_time,
442 exec_done_time: res.exec_done_time,
443 humble_time: res.humble_time,
444 })
445 }
446 pub async fn ignore_args_by_id(
447 &self,
448 event_id: NonZeroU64,
449 ) -> Result<Option<LastTimeRecord>, DBError> {
450 self.process_args_by_id(false, event_id).await
451 }
452 pub async fn humble_args_by_id(
453 &self,
454 event_id: NonZeroU64,
455 ) -> Result<Option<LastTimeRecord>, DBError> {
456 self.process_args_by_id(true, event_id).await
457 }
458 async fn process_args_by_id(
460 &self,
461 is_humble: bool,
462 event_id: NonZeroU64,
463 ) -> Result<Option<LastTimeRecord>, DBError> {
464 let pool = self.pool.read().unwrap();
465 let event_id = event_id.get() as i64;
466 let latest_record = sqlx::query!(
467 "
468 SELECT id, script_id FROM events
469 WHERE type = ? AND script_id = (SELECT script_id FROM events WHERE id = ?)
470 ORDER BY time DESC LIMIT 1
471 ",
472 EXEC_CODE,
473 event_id,
474 )
475 .fetch_one(&*pool)
476 .await?;
477 if is_humble {
480 ignore_or_humble_arg!("humble", pool, "id = ?", event_id);
481 } else {
482 ignore_or_humble_arg!("ignored", pool, "id = ?", event_id);
483 }
484
485 if latest_record.id == event_id {
486 log::info!("process last args");
489 let ret = self.make_last_time_record(latest_record.script_id).await?;
490 return Ok(Some(ret));
491 }
492 Ok(None)
493 }
494 pub async fn ignore_args_range(
495 &self,
496 ids: &[i64],
497 dir: Option<&Path>,
498 no_humble: bool,
499 show_env: bool,
500 show_args: bool,
501 min: NonZeroU64,
502 max: Option<NonZeroU64>,
503 ) -> Result<Vec<LastTimeRecord>, DBError> {
504 let ids_str = join_id_str(ids);
505
506 let offset = min.get() as i64 - 1;
507 let limit = if let Some(max) = max {
508 (max.get() - min.get()) as i64
509 } else {
510 -1
511 };
512 let no_dir = dir.is_none();
513 let dir = dir.map(|p| p.to_string_lossy());
514 let dir = dir.as_deref().unwrap_or(EMPTY_STR);
515 log::info!("忽略歷史 {} {} {}", offset, limit, ids_str);
516
517 let pool = self.pool.read().unwrap();
518 macro_rules! ignore_arg {
519 ($($target:literal)*) => {{
520 ignore_or_humble_arg!(
523 "ignored",
524 pool,
525 "
526 (? OR dir == ?) AND
527 (NOT ? OR NOT humble) AND
528 (script_id " $(+ "||" + $target)* + ") IN (
529 WITH records AS (
530 SELECT max(time) as time, script_id " $(+ "," + $target)* +" FROM events
531 WHERE instr(?, '[' || script_id || ']') > 0
532 AND type = ? AND NOT ignored
533 AND (? OR dir == ?)
534 AND (NOT ? OR NOT humble)
535 GROUP BY script_id " $( + "," + $target)* + " ORDER BY time DESC LIMIT ? OFFSET ?
536 ) SELECT script_id " $(+ "||" + $target)* + " as t FROM records
537 )
538 ",
539 no_dir,
540 dir,
541 no_humble,
542 ids_str,
543 EXEC_CODE,
544 no_dir,
545 dir,
546 no_humble,
547 limit,
548 offset
549 );
550 }};
551 }
552
553 match (show_env, show_args) {
554 (true, true) => ignore_arg!("args" "envs"),
555 (true, false) => ignore_arg!("envs"),
556 (false, true) => ignore_arg!("args"),
557 (false, false) => unreachable!(),
558 }
559
560 log::info!("ignore last args");
561 let mut ret = vec![];
562 for &id in ids {
563 ret.push(self.make_last_time_record(id).await?);
565 }
566 Ok(ret)
567 }
568
569 pub async fn amend_args_by_id(
570 &self,
571 event_id: NonZeroU64,
572 args: &str,
573 envs: Option<&str>,
574 ) -> Result<(), DBError> {
575 let event_id = event_id.get() as i64;
576
577 macro_rules! amend {
578 ($($set:literal, $var:expr),*) => {{
579 sqlx::query!(
580 "UPDATE events SET ignored = false, args = ?"
581 + $( "," + $set + "=? " +)*
582 "WHERE type = ? AND id = ? ",
583 args,
584 $($var,)*
585 EXEC_CODE,
586 event_id,
587 )
588 .execute(&*self.pool.read().unwrap())
589 .await?
590 }}
591 }
592 if let Some(envs) = envs {
593 amend!("envs", envs);
594 } else {
595 amend!();
596 }
597 Ok(())
598 }
599
600 pub async fn clear_except_script_ids(&self, script_ids: &[i64]) -> Result<(), DBError> {
602 let ids = join_id_str(script_ids);
603 let pool = self.pool.read().unwrap();
604 sqlx::query!(
606 "
607 DELETE FROM events
608 WHERE instr(?, '[' || script_id || ']') <= 0
609 ",
610 ids
611 )
612 .execute(&*pool)
613 .await?;
614
615 sqlx::query!("VACUUM").execute(&*pool).await?;
616
617 Ok(())
618 }
619
620 pub async fn tidy(&self, script_id: i64) -> Result<(), DBError> {
621 let pool = self.pool.read().unwrap();
622 sqlx::query!(
623 "
624 DELETE FROM events
625 WHERE script_id = ?
626 AND id NOT IN (
627 SELECT id FROM
628 (
629 SELECT id, MAX(time) FROM events
630 WHERE script_id = ?
631 AND type = ?
632 AND NOT ignored
633 GROUP BY args, dir
634 )
635 )
636 ",
637 script_id,
638 script_id,
639 EXEC_CODE,
640 )
641 .execute(&*pool)
642 .await?;
643
644 Ok(())
645 }
646}
647
648fn join_id_str(ids: &[i64]) -> String {
649 use std::fmt::Write;
650 let mut ret = String::new();
651 for id in ids {
652 write!(ret, "[{}]", id).unwrap();
653 }
654 ret
655}