planetary_db/
lib.rs

1//! Implementation of database support for Planetary.
2
3use std::borrow::Cow;
4use std::fmt;
5
6use anyhow::Result;
7use chrono::DateTime;
8use chrono::Utc;
9use futures::future::BoxFuture;
10use serde::Deserialize;
11use serde::Serialize;
12use tes::v1::types::requests::GetTaskParams;
13use tes::v1::types::requests::ListTasksParams;
14use tes::v1::types::requests::Task as RequestTask;
15use tes::v1::types::responses::OutputFile;
16use tes::v1::types::responses::TaskResponse;
17use tes::v1::types::task::Input;
18use tes::v1::types::task::Output;
19use tes::v1::types::task::State;
20
21#[cfg(feature = "postgres")]
22pub mod postgres;
23
24/// Represents a database error.
25#[derive(Debug, thiserror::Error)]
26pub enum Error {
27    /// The provided page token wasn't valid.
28    #[error("page token `{0}` is not valid")]
29    InvalidPageToken(String),
30    /// A PostgreSQL error occurred.
31    #[cfg(feature = "postgres")]
32    #[error(transparent)]
33    Postgres(#[from] postgres::Error),
34    /// Another type of error occurred during the database operation.
35    #[error(transparent)]
36    Other(#[from] anyhow::Error),
37}
38
39/// The result type for database operations.
40pub type DatabaseResult<T> = Result<T, Error>;
41
42/// Represents information about a task's inputs and outputs.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct TaskIo {
45    /// The list of inputs for the task.
46    pub inputs: Vec<Input>,
47    /// The list of outputs for the task.
48    pub outputs: Vec<Output>,
49}
50
51/// Represents a kind of container.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
53pub enum ContainerKind {
54    /// The container is for downloading a task's inputs.
55    Inputs,
56    /// The container is a task executor.
57    Executor,
58    /// The container is for uploading a task's outputs.
59    Outputs,
60}
61
62impl fmt::Display for ContainerKind {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        match self {
65            Self::Inputs => write!(f, "inputs"),
66            Self::Executor => write!(f, "executor"),
67            Self::Outputs => write!(f, "outputs"),
68        }
69    }
70}
71
72/// Represents information about a terminated container.
73#[derive(Debug, Clone)]
74pub struct TerminatedContainer<'a> {
75    /// The kind of the container.
76    pub kind: ContainerKind,
77    /// The index of the executor.
78    ///
79    /// This is `None` when the container was not an executor.
80    pub executor_index: Option<i32>,
81    /// The start time of the container.
82    pub start_time: DateTime<Utc>,
83    /// The end time of the container.
84    pub end_time: DateTime<Utc>,
85    /// The stdout of the container.
86    pub stdout: Option<Cow<'a, str>>,
87    /// The stderr of the container.
88    pub stderr: Option<Cow<'a, str>>,
89    /// The exit code of the container.
90    pub exit_code: i32,
91}
92
93/// An abstraction for the planetary database.
94#[async_trait::async_trait]
95pub trait Database: Send + Sync + 'static {
96    /// Inserts a task into the database.
97    ///
98    /// Note: it is expected that the newly inserted task has the `UNKNOWN`
99    /// state.
100    ///
101    /// Returns the generated TES task identifier.
102    async fn insert_task(&self, task: &RequestTask) -> DatabaseResult<String>;
103
104    /// Gets a task from the database.
105    async fn get_task(&self, tes_id: &str, params: GetTaskParams) -> DatabaseResult<TaskResponse>;
106
107    /// Gets tasks from the database.
108    ///
109    /// Returns a list of tasks and the page token to use for the next request.
110    async fn get_tasks(
111        &self,
112        params: ListTasksParams,
113    ) -> DatabaseResult<(Vec<TaskResponse>, Option<String>)>;
114
115    /// Gets the inputs and outputs of a task.
116    async fn get_task_io(&self, tes_id: &str) -> DatabaseResult<TaskIo>;
117
118    /// Gets the TES identifiers of in-progress tasks.
119    ///
120    /// Only tasks created before the given datetime are returned.
121    ///
122    /// An in-progress task is in one of the following states:
123    ///
124    /// * Unknown
125    /// * Queued
126    /// * Initializing
127    /// * Running
128    async fn get_in_progress_tasks(&self, before: DateTime<Utc>) -> DatabaseResult<Vec<String>>;
129
130    /// Updates the state of a task.
131    ///
132    /// The provided message is added to the task's system log if the task is
133    /// transitioned to the given state.
134    ///
135    /// The given future for retrieving the terminated containers will be called
136    /// if the task is transitioned to the given state; the returned containers
137    /// are then recorded in the database.
138    ///
139    /// Returns `Ok(true)` if the status was updated or `Ok(false)` if the
140    /// task's current state cannot be transitioned to the given state.
141    async fn update_task_state<'a>(
142        &self,
143        tes_id: &str,
144        state: State,
145        messages: &[&str],
146        containers: Option<BoxFuture<'a, Result<Vec<TerminatedContainer<'a>>>>>,
147    ) -> DatabaseResult<bool>;
148
149    /// Appends the given messages to the task's system log.
150    async fn append_system_log(&self, tes_id: &str, messages: &[&str]) -> DatabaseResult<()>;
151
152    /// Updates the output files of the given task.
153    async fn update_task_output_files(
154        &self,
155        tes_id: &str,
156        files: &[OutputFile],
157    ) -> DatabaseResult<()>;
158
159    /// Inserts an internal system error with the database.
160    async fn insert_error(
161        &self,
162        source: &str,
163        tes_id: Option<&str>,
164        message: &str,
165    ) -> DatabaseResult<()>;
166}
167
168/// Formats a log message by including a time stamp.
169#[macro_export]
170macro_rules! format_log_message {
171    ($($arg:tt)*) => { format!("[{ts}] {args}", ts = chrono::Utc::now().format("%Y-%m-%dT%H:%M:%S%.6fZ"), args = format_args!($($arg)*)) }
172}