1use std::borrow::Cow;
4use std::fmt;
5
6use anyhow::Result;
7use chrono::DateTime;
8use chrono::Utc;
9use futures::future::BoxFuture;
10use serde::Deserialize;
11use serde::Serialize;
12use tes::v1::types::requests::GetTaskParams;
13use tes::v1::types::requests::ListTasksParams;
14use tes::v1::types::requests::Task as RequestTask;
15use tes::v1::types::responses::OutputFile;
16use tes::v1::types::responses::TaskResponse;
17use tes::v1::types::task::Input;
18use tes::v1::types::task::Output;
19use tes::v1::types::task::State;
20
21#[cfg(feature = "postgres")]
22pub mod postgres;
23
24#[derive(Debug, thiserror::Error)]
26pub enum Error {
27 #[error("page token `{0}` is not valid")]
29 InvalidPageToken(String),
30 #[cfg(feature = "postgres")]
32 #[error(transparent)]
33 Postgres(#[from] postgres::Error),
34 #[error(transparent)]
36 Other(#[from] anyhow::Error),
37}
38
39pub type DatabaseResult<T> = Result<T, Error>;
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct TaskIo {
45 pub inputs: Vec<Input>,
47 pub outputs: Vec<Output>,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
53pub enum ContainerKind {
54 Inputs,
56 Executor,
58 Outputs,
60}
61
62impl fmt::Display for ContainerKind {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 match self {
65 Self::Inputs => write!(f, "inputs"),
66 Self::Executor => write!(f, "executor"),
67 Self::Outputs => write!(f, "outputs"),
68 }
69 }
70}
71
72#[derive(Debug, Clone)]
74pub struct TerminatedContainer<'a> {
75 pub kind: ContainerKind,
77 pub executor_index: Option<i32>,
81 pub start_time: DateTime<Utc>,
83 pub end_time: DateTime<Utc>,
85 pub stdout: Option<Cow<'a, str>>,
87 pub stderr: Option<Cow<'a, str>>,
89 pub exit_code: i32,
91}
92
93#[async_trait::async_trait]
95pub trait Database: Send + Sync + 'static {
96 async fn insert_task(&self, task: &RequestTask) -> DatabaseResult<String>;
103
104 async fn get_task(&self, tes_id: &str, params: GetTaskParams) -> DatabaseResult<TaskResponse>;
106
107 async fn get_tasks(
111 &self,
112 params: ListTasksParams,
113 ) -> DatabaseResult<(Vec<TaskResponse>, Option<String>)>;
114
115 async fn get_task_io(&self, tes_id: &str) -> DatabaseResult<TaskIo>;
117
118 async fn get_in_progress_tasks(&self, before: DateTime<Utc>) -> DatabaseResult<Vec<String>>;
129
130 async fn update_task_state<'a>(
142 &self,
143 tes_id: &str,
144 state: State,
145 messages: &[&str],
146 containers: Option<BoxFuture<'a, Result<Vec<TerminatedContainer<'a>>>>>,
147 ) -> DatabaseResult<bool>;
148
149 async fn append_system_log(&self, tes_id: &str, messages: &[&str]) -> DatabaseResult<()>;
151
152 async fn update_task_output_files(
154 &self,
155 tes_id: &str,
156 files: &[OutputFile],
157 ) -> DatabaseResult<()>;
158
159 async fn insert_error(
161 &self,
162 source: &str,
163 tes_id: Option<&str>,
164 message: &str,
165 ) -> DatabaseResult<()>;
166}
167
168#[macro_export]
170macro_rules! format_log_message {
171 ($($arg:tt)*) => { format!("[{ts}] {args}", ts = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.6fZ"), args = format_args!($($arg)*)) }
172}