hyper_scripter_historian/
lib.rs

1use chrono::NaiveDateTime;
2use sqlx::migrate::MigrateError;
3use sqlx::{error::Error as DBError, Pool, Sqlite, SqlitePool};
4use std::num::NonZeroU64;
5use std::path::{Path, PathBuf};
6use std::sync::{Arc, RwLock};
7
8mod db;
9mod event;
10pub mod migration;
11pub use event::*;
12
13const ZERO: i64 = 0;
14const EMPTY_STR: &str = "";
15
16const EXEC_CODE: i8 = EventType::Exec.get_code();
17const EXEC_DONE_CODE: i8 = EventType::ExecDone.get_code();
18
19#[derive(Debug, Clone)]
20pub struct Historian {
21    pool: Arc<RwLock<SqlitePool>>,
22    dir_path: PathBuf,
23}
24
25async fn raw_record_event(pool: &Pool<Sqlite>, event: DBEvent<'_>) -> Result<i64, DBError> {
26    let res = sqlx::query!(
27        "
28        INSERT INTO events
29        (script_id, type, cmd, args, content, time, main_event_id, dir, envs, humble)
30        VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
31        RETURNING id
32        ",
33        event.script_id,
34        event.ty,
35        event.cmd,
36        event.args,
37        event.content,
38        event.time,
39        event.main_event_id,
40        event.dir,
41        event.envs,
42        event.humble
43    )
44    .fetch_one(pool)
45    .await?;
46    Ok(res.id)
47}
48
49#[derive(Clone, Copy)]
50struct DBEvent<'a> {
51    script_id: i64,
52    ty: i8,
53    cmd: &'a str,
54    time: NaiveDateTime,
55    args: Option<&'a str>,
56    dir: Option<&'a str>,
57    envs: Option<&'a str>,
58    content: Option<&'a str>,
59    humble: bool,
60    main_event_id: i64,
61}
62impl<'a> DBEvent<'a> {
63    fn new(script_id: i64, time: NaiveDateTime, ty: i8, cmd: &'a str, humble: bool) -> Self {
64        DBEvent {
65            script_id,
66            time,
67            ty,
68            cmd,
69            humble,
70            main_event_id: ZERO,
71            envs: None,
72            content: None,
73            args: None,
74            dir: None,
75        }
76    }
77    fn args(mut self, value: &'a str) -> Self {
78        self.args = Some(value);
79        self
80    }
81    fn dir(mut self, value: &'a str) -> Self {
82        self.dir = Some(value);
83        self
84    }
85    fn content(mut self, value: &'a str) -> Self {
86        self.content = Some(value);
87        self
88    }
89    fn envs(mut self, value: &'a str) -> Self {
90        self.envs = Some(value);
91        self
92    }
93    fn humble(mut self) -> Self {
94        self.humble = true;
95        self
96    }
97    fn main_event_id(mut self, value: i64) -> Self {
98        self.main_event_id = value;
99        self
100    }
101}
102
103macro_rules! last_arg {
104    ($select:literal, $offset:expr, $limit:expr, $group_by:literal, $where:literal $(+ $more_where:literal)* , $($var:expr),*) => {{
105        sqlx::query!(
106            "
107            WITH args AS (
108                SELECT " + $select + ", max(time) as time FROM events
109                WHERE type = ? AND NOT ignored "
110                +
111                $where
112                $(+ $more_where)*
113                +
114                " GROUP BY script_id " + $group_by + " ORDER BY time DESC LIMIT ? OFFSET ?
115            ) SELECT "
116                + $select
117                + " FROM args
118            ",
119            EXEC_CODE,
120            $($var, )*
121            $limit,
122            $offset,
123        )
124    }};
125}
126macro_rules! do_last_arg {
127    ($select:literal, $group_by:literal, $ids:expr, $limit:expr, $offset:expr, $no_humble:expr, $dir:expr, $historian:expr) => {{
128        let ids = join_id_str($ids);
129        log::info!("查詢歷史 {}", ids);
130        let limit = $limit as i64;
131        let offset = $offset as i64;
132        let no_dir = $dir.is_none();
133        let dir = $dir.map(|p| p.to_string_lossy());
134        let dir = dir.as_deref().unwrap_or(EMPTY_STR);
135        // FIXME: 一旦可以綁定陣列就換掉這個醜死人的 instr
136        last_arg!(
137            $select,
138            offset,
139            limit,
140            $group_by,
141            "
142            AND instr(?, '[' || script_id || ']') > 0 AND (? OR dir = ?)
143            AND (NOT ? OR NOT humble)
144            ",
145            ids,
146            no_dir,
147            dir,
148            $no_humble
149        )
150        .fetch_all(&*$historian.pool.read().unwrap())
151        .await
152    }};
153}
154
155macro_rules! ignore_or_humble_arg {
156    ($ignore_or_humble:literal, $pool:expr, $cond:literal $(+ $more_cond:literal)*, $($var:expr),+) => {
157        sqlx::query!(
158            "
159            UPDATE events SET " + $ignore_or_humble + " = true
160            WHERE type = ? AND main_event_id IN (
161                SELECT id FROM events WHERE type = ? AND NOT ignored AND "
162                + $cond $(+ $more_cond)*
163                + "
164            )
165            ",
166            EXEC_DONE_CODE,
167            EXEC_CODE,
168            $($var),*
169        )
170        .execute(&*$pool)
171        .await?;
172
173        sqlx::query!(
174            "
175            UPDATE events SET " + $ignore_or_humble + " = true
176            WHERE type = ? AND NOT ignored AND
177            "
178                + $cond $(+ $more_cond)*,
179            EXEC_CODE,
180            $($var),*
181        )
182        .execute(&*$pool)
183        .await?;
184    };
185}
186
187#[derive(Debug)]
188pub struct LastTimeRecord {
189    pub script_id: i64,
190    pub exec_time: Option<NaiveDateTime>,
191    pub exec_done_time: Option<NaiveDateTime>,
192    pub humble_time: Option<NaiveDateTime>,
193}
194
195impl Historian {
196    pub async fn close(self) {
197        log::info!("close the historian database");
198        if let Ok(pool) = self.pool.read() {
199            pool.close().await;
200        }
201    }
202    async fn raw_record(&self, event: DBEvent<'_>) -> Result<i64, DBError> {
203        let pool = &mut *self.pool.write().unwrap();
204        let res = raw_record_event(pool, event).await;
205        if res.is_err() {
206            pool.close().await;
207            log::warn!("資料庫錯誤 {:?},再試最後一次!", res);
208            *pool = db::get_pool(&self.dir_path).await?;
209            return raw_record_event(pool, event).await;
210        }
211
212        res
213    }
214    pub async fn new(dir_path: PathBuf) -> Result<Self, DBError> {
215        db::get_pool(&dir_path).await.map(|pool| Historian {
216            pool: Arc::new(RwLock::new(pool)),
217            dir_path,
218        })
219    }
220    pub async fn do_migrate(dir_path: &Path) -> Result<(), MigrateError> {
221        migration::do_migrate(db::get_file(dir_path)).await?;
222        Ok(())
223    }
224
225    pub async fn remove(&self, script_id: i64) -> Result<(), DBError> {
226        let pool = self.pool.read().unwrap();
227        sqlx::query!("DELETE FROM events WHERE script_id = ?", script_id,)
228            .execute(&*pool)
229            .await?;
230        Ok(())
231    }
232
233    pub async fn record(&self, event: &Event<'_>) -> Result<i64, DBError> {
234        log::debug!("記錄事件 {:?}", event);
235        let ty = event.data.get_type().get_code();
236        let cmd = std::env::args().collect::<Vec<_>>().join(" ");
237        let mut db_event = DBEvent::new(event.script_id, event.time, ty, &cmd, event.humble);
238        let id = match &event.data {
239            EventData::Write | EventData::Read => self.raw_record(db_event).await?,
240            EventData::Exec {
241                content,
242                args,
243                envs,
244                dir,
245            } => {
246                let mut content = Some(*content);
247                let last_event = sqlx::query!(
248                    "
249                    SELECT content FROM events
250                    WHERE type = ? AND script_id = ? AND NOT content IS NULL
251                    ORDER BY time DESC LIMIT 1
252                    ",
253                    ty,
254                    event.script_id
255                )
256                .fetch_optional(&*self.pool.read().unwrap())
257                .await?;
258                if let Some(last_event) = last_event {
259                    if last_event.content.as_deref() == content {
260                        log::debug!("上次執行內容相同,不重複記錄");
261                        content = None;
262                    }
263                }
264                db_event.content = content;
265                let dir = dir.map(|p| p.to_string_lossy()).unwrap_or_default();
266                self.raw_record(db_event.envs(envs).dir(dir.as_ref()).args(args))
267                    .await?
268            }
269            EventData::ExecDone {
270                code,
271                main_event_id,
272            } => {
273                let main_event = sqlx::query!(
274                    "SELECT ignored, humble FROM events WHERE type = ? AND id = ?",
275                    EXEC_CODE,
276                    main_event_id
277                )
278                .fetch_optional(&*self.pool.read().unwrap())
279                .await?;
280                let main_event = match main_event {
281                    Some(e) => e,
282                    None => {
283                        log::warn!("找不到主要事件,可能被 tidy 掉了");
284                        return Ok(ZERO);
285                    }
286                };
287                if main_event.ignored {
288                    return Ok(ZERO);
289                } else if main_event.humble {
290                    log::debug!("謙卑地執行完畢了");
291                    db_event = db_event.humble();
292                }
293
294                let code = code.to_string();
295                let id = self
296                    .raw_record(db_event.content(&code).main_event_id(*main_event_id))
297                    .await?;
298
299                if db_event.humble {
300                    // XXX: 用很怪異的方式告訴外面的人不要記錄最新時間,醜死
301                    ZERO
302                } else {
303                    id
304                }
305            }
306        };
307        Ok(id)
308    }
309
310    pub async fn previous_args(
311        &self,
312        id: i64,
313        dir: Option<&Path>,
314    ) -> Result<Option<(String, String)>, DBError> {
315        let no_dir = dir.is_none();
316        let dir = dir.map(|p| p.to_string_lossy());
317        let dir = dir.as_deref().unwrap_or(EMPTY_STR);
318        let res = sqlx::query!(
319            "
320            SELECT args, envs FROM events
321            WHERE type = ? AND script_id = ? AND NOT ignored
322            AND (? OR dir = ?)
323            ORDER BY time DESC LIMIT 1
324            ",
325            EXEC_CODE,
326            id,
327            no_dir,
328            dir
329        )
330        .fetch_optional(&*self.pool.read().unwrap())
331        .await?;
332        Ok(res.map(|res| (res.args.unwrap_or_default(), res.envs.unwrap_or_default())))
333    }
334
335    pub async fn previous_args_list(
336        &self,
337        ids: &[i64],
338        limit: u32,
339        offset: u32,
340        no_humble: bool,
341        dir: Option<&Path>,
342    ) -> Result<impl ExactSizeIterator<Item = (i64, String)>, DBError> {
343        let res = do_last_arg!(
344            "script_id, args",
345            ", args",
346            ids,
347            limit,
348            offset,
349            no_humble,
350            dir,
351            self
352        )?;
353        Ok(res.into_iter().map(|res| {
354            (
355                res.script_id.unwrap_or_default(),
356                res.args.unwrap_or_default(),
357            )
358        }))
359    }
360
361    pub async fn previous_args_list_with_envs(
362        &self,
363        ids: &[i64],
364        limit: u32,
365        offset: u32,
366        no_humble: bool,
367        dir: Option<&Path>,
368    ) -> Result<impl ExactSizeIterator<Item = (i64, String, String)>, DBError> {
369        let res = do_last_arg!(
370            "script_id, args, envs",
371            ", args, envs",
372            ids,
373            limit,
374            offset,
375            no_humble,
376            dir,
377            self
378        )?;
379        Ok(res.into_iter().map(|res| {
380            (
381                res.script_id.unwrap_or_default(),
382                res.args.unwrap_or_default(),
383                res.envs.unwrap_or_default(),
384            )
385        }))
386    }
387
388    pub async fn previous_args_list_only_envs(
389        &self,
390        ids: &[i64],
391        limit: u32,
392        offset: u32,
393        no_humble: bool,
394        dir: Option<&Path>,
395    ) -> Result<impl ExactSizeIterator<Item = (i64, String)>, DBError> {
396        let res = do_last_arg!(
397            "script_id, envs",
398            ", envs",
399            ids,
400            limit,
401            offset,
402            no_humble,
403            dir,
404            self
405        )?;
406        Ok(res.into_iter().map(|res| {
407            (
408                res.script_id.unwrap_or_default(),
409                res.envs.unwrap_or_default(),
410            )
411        }))
412    }
413    async fn make_last_time_record(&self, script_id: i64) -> Result<LastTimeRecord, DBError> {
414        let res = sqlx::query_as_unchecked!(
415            LastTimeRecord,
416            "
417            SELECT
418                ? as script_id,
419                (SELECT time FROM events
420                WHERE script_id = ? AND NOT ignored AND humble
421                ORDER BY time DESC LIMIT 1) as humble_time,
422                (SELECT time FROM events
423                WHERE script_id = ? AND NOT ignored AND NOT humble AND type = ?
424                ORDER BY time DESC LIMIT 1) as exec_time,
425                (SELECT time FROM events
426                WHERE script_id = ? AND NOT ignored AND NOT humble AND type = ?
427                ORDER BY time DESC LIMIT 1) as exec_done_time
428            ",
429            script_id,
430            script_id,
431            script_id,
432            EXEC_CODE,
433            script_id,
434            EXEC_DONE_CODE
435        )
436        .fetch_one(&*self.pool.read().unwrap())
437        .await?;
438
439        Ok(LastTimeRecord {
440            script_id,
441            exec_time: res.exec_time,
442            exec_done_time: res.exec_done_time,
443            humble_time: res.humble_time,
444        })
445    }
446    pub async fn ignore_args_by_id(
447        &self,
448        event_id: NonZeroU64,
449    ) -> Result<Option<LastTimeRecord>, DBError> {
450        self.process_args_by_id(false, event_id).await
451    }
452    pub async fn humble_args_by_id(
453        &self,
454        event_id: NonZeroU64,
455    ) -> Result<Option<LastTimeRecord>, DBError> {
456        self.process_args_by_id(true, event_id).await
457    }
458    /// humble or ignore
459    async fn process_args_by_id(
460        &self,
461        is_humble: bool,
462        event_id: NonZeroU64,
463    ) -> Result<Option<LastTimeRecord>, DBError> {
464        let pool = self.pool.read().unwrap();
465        let event_id = event_id.get() as i64;
466        let latest_record = sqlx::query!(
467            "
468            SELECT id, script_id FROM events
469            WHERE type = ? AND script_id = (SELECT script_id FROM events WHERE id = ?)
470            ORDER BY time DESC LIMIT 1
471            ",
472            EXEC_CODE,
473            event_id,
474        )
475        .fetch_one(&*pool)
476        .await?;
477        // TODO: check if this event is exec?
478
479        if is_humble {
480            ignore_or_humble_arg!("humble", pool, "id = ?", event_id);
481        } else {
482            ignore_or_humble_arg!("ignored", pool, "id = ?", event_id);
483        }
484
485        if latest_record.id == event_id {
486            // NOTE: 若 event_id 為最新但已被 ignored/humble,仍會被抓成 last_record 並進入這裡
487            // 但應該不致於有太大的效能問題
488            log::info!("process last args");
489            let ret = self.make_last_time_record(latest_record.script_id).await?;
490            return Ok(Some(ret));
491        }
492        Ok(None)
493    }
494    pub async fn ignore_args_range(
495        &self,
496        ids: &[i64],
497        dir: Option<&Path>,
498        no_humble: bool,
499        show_env: bool,
500        show_args: bool,
501        min: NonZeroU64,
502        max: Option<NonZeroU64>,
503    ) -> Result<Vec<LastTimeRecord>, DBError> {
504        let ids_str = join_id_str(ids);
505
506        let offset = min.get() as i64 - 1;
507        let limit = if let Some(max) = max {
508            (max.get() - min.get()) as i64
509        } else {
510            -1
511        };
512        let no_dir = dir.is_none();
513        let dir = dir.map(|p| p.to_string_lossy());
514        let dir = dir.as_deref().unwrap_or(EMPTY_STR);
515        log::info!("忽略歷史 {} {} {}", offset, limit, ids_str);
516
517        let pool = self.pool.read().unwrap();
518        macro_rules! ignore_arg {
519            ($($target:literal)*) => {{
520                // NOTE: 我們知道 script_id || args 串接起來必然是唯一的(因為 args 的格式為 [...])
521                // FIXME: 一旦可以綁定陣列就換掉這個醜死人的 instr
522                ignore_or_humble_arg!(
523                    "ignored",
524                    pool,
525                    "
526                    (? OR dir == ?) AND
527                    (NOT ? OR NOT humble) AND
528                    (script_id " $(+ "||" + $target)* + ") IN (
529                        WITH records AS (
530                            SELECT max(time) as time, script_id " $(+ "," + $target)* +" FROM events
531                            WHERE instr(?, '[' || script_id || ']') > 0
532                            AND type = ? AND NOT ignored
533                            AND (? OR dir == ?)
534                            AND (NOT ? OR NOT humble)
535                            GROUP BY script_id " $( + "," + $target)* + " ORDER BY time DESC LIMIT ? OFFSET ?
536                        ) SELECT script_id " $(+ "||" + $target)* + " as t FROM records
537                    )
538                    ",
539                    no_dir,
540                    dir,
541                    no_humble,
542                    ids_str,
543                    EXEC_CODE,
544                    no_dir,
545                    dir,
546                    no_humble,
547                    limit,
548                    offset
549                );
550            }};
551        }
552
553        match (show_env, show_args) {
554            (true, true) => ignore_arg!("args" "envs"),
555            (true, false) => ignore_arg!("envs"),
556            (false, true) => ignore_arg!("args"),
557            (false, false) => unreachable!(),
558        }
559
560        log::info!("ignore last args");
561        let mut ret = vec![];
562        for &id in ids {
563            // TODO: 平行?
564            ret.push(self.make_last_time_record(id).await?);
565        }
566        Ok(ret)
567    }
568
569    pub async fn amend_args_by_id(
570        &self,
571        event_id: NonZeroU64,
572        args: &str,
573        envs: Option<&str>,
574    ) -> Result<(), DBError> {
575        let event_id = event_id.get() as i64;
576
577        macro_rules! amend {
578            ($($set:literal, $var:expr),*) => {{
579                sqlx::query!(
580                    "UPDATE events SET ignored = false, args = ?"
581                    + $( "," + $set + "=? " +)*
582                    "WHERE type = ? AND id = ? ",
583                    args,
584                    $($var,)*
585                    EXEC_CODE,
586                    event_id,
587                )
588                .execute(&*self.pool.read().unwrap())
589                .await?
590            }}
591        }
592        if let Some(envs) = envs {
593            amend!("envs", envs);
594        } else {
595            amend!();
596        }
597        Ok(())
598    }
599
600    /// 除了輸入進來的 script id 外,其它事件通通砍除
601    pub async fn clear_except_script_ids(&self, script_ids: &[i64]) -> Result<(), DBError> {
602        let ids = join_id_str(script_ids);
603        let pool = self.pool.read().unwrap();
604        // FIXME: 一旦可以綁定陣列就換掉這個醜死人的 instr
605        sqlx::query!(
606            "
607            DELETE FROM events
608            WHERE instr(?, '[' || script_id || ']') <= 0
609            ",
610            ids
611        )
612        .execute(&*pool)
613        .await?;
614
615        sqlx::query!("VACUUM").execute(&*pool).await?;
616
617        Ok(())
618    }
619
620    pub async fn tidy(&self, script_id: i64) -> Result<(), DBError> {
621        let pool = self.pool.read().unwrap();
622        sqlx::query!(
623            "
624            DELETE FROM events
625            WHERE script_id = ?
626              AND id NOT IN (
627                SELECT id FROM
628                  (
629                    SELECT id, MAX(time) FROM events
630                    WHERE script_id = ?
631                      AND type = ?
632                      AND NOT ignored
633                    GROUP BY args, dir
634                  )
635              )
636            ",
637            script_id,
638            script_id,
639            EXEC_CODE,
640        )
641        .execute(&*pool)
642        .await?;
643
644        Ok(())
645    }
646}
647
648fn join_id_str(ids: &[i64]) -> String {
649    use std::fmt::Write;
650    let mut ret = String::new();
651    for id in ids {
652        write!(ret, "[{}]", id).unwrap();
653    }
654    ret
655}