mod convert;
mod migrations;
#[cfg(test)]
mod tests;
use crate::{
collection::{Collection, CollectionFile, ProfileId, RecipeId},
database::convert::{CollectionPath, SqlWrap},
http::{Exchange, ExchangeSummary, RequestId},
};
use chrono::Utc;
use rusqlite::{Connection, OptionalExtension, named_params};
use slumber_util::{ResultTraced, paths};
use std::{
borrow::Cow,
fmt::Debug,
io,
ops::{Deref, DerefMut},
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
use strum::IntoDiscriminant;
use thiserror::Error;
use tracing::{debug, info};
use uuid::Uuid;
const MAX_COMMAND_HISTORY_SIZE: u32 = 100;
#[derive(Clone, Debug)]
pub struct Database {
connection: Arc<Mutex<Connection>>,
}
impl Database {
const FILE: &'static str = "state.sqlite";
pub fn load() -> Result<Self, DatabaseError> {
Self::from_path(&Self::path())
}
pub fn from_directory(directory: &Path) -> Result<Self, DatabaseError> {
let path = directory.join(Self::FILE);
Self::from_path(&path)
}
fn from_path(path: &Path) -> Result<Self, DatabaseError> {
paths::create_parent(path).map_err(DatabaseError::Directory)?;
info!(?path, "Loading database");
let mut connection = Connection::open(path)
.and_then(|conn| {
conn.pragma_update(None, "foreign_keys", "ON")?;
conn.pragma_update(None, "journal_mode", "WAL")?;
Ok(conn)
})
.map_err(DatabaseError::add_context("Opening database"))?;
Self::migrate(&mut connection)?;
Ok(Self {
connection: Arc::new(Mutex::new(connection)),
})
}
pub fn path() -> PathBuf {
paths::data_directory().join(Self::FILE)
}
fn migrate(connection: &mut Connection) -> Result<(), DatabaseError> {
migrations::migrations()
.to_latest(connection)
.map_err(DatabaseError::Migrate)
}
fn connection(&self) -> impl '_ + DerefMut<Target = Connection> {
self.connection.lock().expect("Connection lock poisoned")
}
pub fn get_collections(
&self,
) -> Result<Vec<CollectionMetadata>, DatabaseError> {
self.connection()
.prepare("SELECT * FROM collections")
.and_then(|mut stmt| {
stmt.query_map([], |row| row.try_into())?
.collect::<rusqlite::Result<Vec<_>>>()
})
.map_err(DatabaseError::add_context("Querying collections"))
.traced()
}
pub fn get_collection_id(
&self,
path: &Path,
) -> Result<CollectionId, DatabaseError> {
let path = CollectionPath::try_from_path_maybe_missing(path)?;
self.connection()
.query_row(
"SELECT id FROM collections WHERE path = :path",
named_params! {":path": &path},
|row| row.get::<_, CollectionId>("id"),
)
.map_err(|error| match error {
rusqlite::Error::QueryReturnedNoRows => {
DatabaseError::ResourceUnknown {
kind: "collection",
id: path.to_string(),
}
}
_ => {
DatabaseError::with_context(error, "Querying collection ID")
}
})
.traced()
}
pub fn get_collection_metadata(
&self,
id: CollectionId,
) -> Result<CollectionMetadata, DatabaseError> {
self.connection()
.query_row(
"SELECT * FROM collections WHERE id = :id",
named_params! {":id": id},
|row| CollectionMetadata::try_from(row),
)
.map_err(|error| match error {
rusqlite::Error::QueryReturnedNoRows => {
DatabaseError::ResourceUnknown {
kind: "collection",
id: id.to_string(),
}
}
_ => DatabaseError::with_context(
error,
"Querying collection metadata",
),
})
.traced()
}
pub fn delete_collection(
&self,
collection: CollectionId,
) -> Result<(), DatabaseError> {
let statements = [
"DELETE FROM requests_v2 WHERE collection_id = :id",
"DELETE FROM ui_state_v2 WHERE collection_id = :id",
"DELETE FROM commands WHERE collection_id = :id",
"DELETE FROM collections WHERE id = :id",
];
let mut connection = self.connection();
connection
.transaction()
.and_then(|tx| {
for statement in statements {
tx.prepare(statement)?
.execute(named_params! {":id": collection})?;
}
tx.commit()?;
Ok(())
})
.map_err({
DatabaseError::add_context(format!(
"Deleting collection `{collection}`"
))
})
.traced()
}
pub fn merge_collections(
&self,
source: CollectionId,
target: CollectionId,
) -> Result<(), DatabaseError> {
info!(?source, ?target, "Merging database state");
let mut connection = self.connection();
let tx = connection.transaction()?;
tx.execute(
"UPDATE requests_v2 SET collection_id = :target
WHERE collection_id = :source",
named_params! {":source": source, ":target": target},
)
.map_err(DatabaseError::add_context("Merging table `requests_v2`"))
.traced()?;
tx.execute(
"UPDATE OR REPLACE ui_state_v2 SET collection_id = :target
WHERE collection_id = :source",
named_params! {":source": source, ":target": target},
)
.map_err(DatabaseError::add_context("Merging table `ui_state_v2`"))
.traced()?;
tx.execute(
"UPDATE OR REPLACE commands SET collection_id = :target
WHERE collection_id = :source",
named_params! {":source": source, ":target": target},
)
.map_err(DatabaseError::add_context("Merging table `commands`"))
.traced()?;
tx.execute(
"DELETE FROM collections WHERE id = :source",
named_params! {":source": source},
)
.map_err(DatabaseError::add_context("Deleting source collection"))
.traced()?;
tx.commit()?;
Ok(())
}
pub fn get_all_requests(
&self,
) -> Result<Vec<ExchangeSummary>, DatabaseError> {
self.connection()
.prepare(
"SELECT id, recipe_id, profile_id, start_time, end_time,
status_code FROM requests_v2 ORDER BY start_time DESC",
)
.and_then(|mut stmt| {
stmt.query_map((), |row| row.try_into())?
.collect::<rusqlite::Result<Vec<_>>>()
})
.map_err(DatabaseError::add_context("Querying requests"))
.traced()
}
pub fn delete_request(
&self,
request_id: RequestId,
) -> Result<usize, DatabaseError> {
self.connection()
.execute(
"DELETE FROM requests_v2 WHERE id = :request_id",
named_params! {":request_id": request_id},
)
.map_err({
DatabaseError::add_context(format!(
"Deleting request `{request_id}`"
))
})
.traced()
}
pub fn into_collection(
self,
file: &CollectionFile,
) -> Result<CollectionDatabase, DatabaseError> {
let path = CollectionPath::try_from_path(file.path())?;
self.connection()
.execute(
"INSERT INTO collections (id, path) VALUES (:id, :path)
ON CONFLICT(path) DO NOTHING",
named_params! {
":id": CollectionId::new(),
":path": &path,
},
)
.map_err(DatabaseError::add_context("Setting collection ID"))
.traced()?;
let collection_id = self
.connection()
.query_row(
"SELECT id FROM collections WHERE path = :path",
named_params! {":path": &path},
|row| row.get::<_, CollectionId>("id"),
)
.map_err(DatabaseError::add_context("Querying collection ID"))
.traced()?;
Ok(CollectionDatabase {
collection_id,
database: self,
})
}
}
#[derive(Clone, Debug)]
pub struct CollectionDatabase {
collection_id: CollectionId,
database: Database,
}
impl CollectionDatabase {
pub fn metadata(&self) -> Result<CollectionMetadata, DatabaseError> {
self.database
.connection()
.query_row(
"SELECT * FROM collections WHERE id = :id",
named_params! {":id": self.collection_id},
|row| row.try_into(),
)
.map_err(DatabaseError::add_context("Querying collection path"))
.traced()
}
pub fn root(&self) -> &Database {
&self.database
}
pub fn set_name(&self, collection: &Collection) {
let name = collection.name.as_deref();
let _ = self
.database
.connection()
.execute(
"UPDATE collections SET name = :name WHERE id = :id",
named_params! {":id": self.collection_id, ":name": name},
)
.map_err(DatabaseError::add_context("Updating collection name"))
.traced();
}
pub fn get_request(
&self,
request_id: RequestId,
) -> Result<Option<Exchange>, DatabaseError> {
self.database
.connection()
.query_row(
"SELECT * FROM requests_v2
WHERE collection_id = :collection_id
AND id = :request_id
ORDER BY start_time DESC LIMIT 1",
named_params! {
":collection_id": self.collection_id,
":request_id": request_id,
},
|row| row.try_into(),
)
.optional()
.map_err(DatabaseError::add_context(format!(
"Querying request {request_id} from database"
)))
.traced()
}
pub fn get_latest_request(
&self,
profile_id: ProfileFilter,
recipe_id: &RecipeId,
) -> Result<Option<Exchange>, DatabaseError> {
self.database
.connection()
.query_row(
"SELECT * FROM requests_v2
WHERE collection_id = :collection_id
AND (:ignore_profile_id OR profile_id IS :profile_id)
AND recipe_id = :recipe_id
ORDER BY start_time DESC LIMIT 1",
named_params! {
":collection_id": self.collection_id,
":ignore_profile_id": profile_id == ProfileFilter::All,
":profile_id": profile_id,
":recipe_id": recipe_id,
},
|row| row.try_into(),
)
.optional()
.map_err(DatabaseError::add_context(format!(
"Querying request [profile={profile_id:?}; \
recipe={recipe_id}] from database"
)))
.traced()
}
pub fn get_recipe_requests(
&self,
profile_filter: ProfileFilter,
recipe_id: &RecipeId,
) -> Result<Vec<ExchangeSummary>, DatabaseError> {
self.database
.connection()
.prepare(
"SELECT id, recipe_id, profile_id, start_time, end_time,
status_code FROM requests_v2
WHERE collection_id = :collection_id
AND (:ignore_profile_id OR profile_id IS :profile_id)
AND recipe_id = :recipe_id
ORDER BY start_time DESC",
)?
.query_map(
named_params! {
":collection_id": self.collection_id,
":ignore_profile_id": profile_filter == ProfileFilter::All,
":profile_id": profile_filter,
":recipe_id": recipe_id,
},
|row| row.try_into(),
)
.map_err(DatabaseError::add_context(
"Querying request history from database",
))
.traced()?
.collect::<rusqlite::Result<Vec<_>>>()
.map_err(DatabaseError::add_context("Extracting request history"))
}
pub fn get_all_requests(
&self,
) -> Result<Vec<ExchangeSummary>, DatabaseError> {
self.database
.connection()
.prepare(
"SELECT id, recipe_id, profile_id, start_time, end_time,
status_code FROM requests_v2
WHERE collection_id = :collection_id ORDER BY start_time DESC",
)
.and_then(|mut stmt| {
stmt.query_map(
named_params! {":collection_id": self.collection_id},
|row| row.try_into(),
)?
.collect::<rusqlite::Result<Vec<_>>>()
})
.map_err(DatabaseError::add_context(format!(
"Querying all requests for collection `{}`",
self.collection_id
)))
.traced()
}
pub fn insert_exchange(
&self,
exchange: &Exchange,
) -> Result<(), DatabaseError> {
debug!(
id = %exchange.id,
status = exchange.response.status.as_u16(),
recipe = %exchange.request.recipe_id,
"Adding exchange to database",
);
self.database
.connection()
.execute(
"INSERT INTO
requests_v2 (
id,
collection_id,
profile_id,
recipe_id,
start_time,
end_time,
http_version,
method,
url,
request_headers,
request_body_kind,
request_body,
status_code,
response_headers,
response_body
)
VALUES (
:id,
:collection_id,
:profile_id,
:recipe_id,
:start_time,
:end_time,
:http_version,
:method,
:url,
:request_headers,
:request_body_kind,
:request_body,
:status_code,
:response_headers,
:response_body
)",
named_params! {
":id": exchange.id,
":collection_id": self.collection_id,
":profile_id": &exchange.request.profile_id,
":recipe_id": &exchange.request.recipe_id,
":start_time": &exchange.start_time,
":end_time": &exchange.end_time,
":http_version": exchange.request.http_version,
":method": exchange.request.method,
":url": exchange.request.url.as_str(),
":request_headers": SqlWrap(&exchange.request.headers),
":request_body_kind": exchange.request.body.discriminant(),
":request_body": exchange.request.body.bytes(),
":status_code": exchange.response.status.as_u16(),
":response_headers": SqlWrap(&exchange.response.headers),
":response_body": exchange.response.body.bytes().deref(),
},
)
.map_err({
DatabaseError::add_context(format!(
"Inserting request `{}`",
exchange.id
))
})
.traced()?;
Ok(())
}
pub fn delete_recipe_requests(
&self,
profile_id: ProfileFilter,
recipe_id: &RecipeId,
) -> Result<Vec<RequestId>, DatabaseError> {
info!(
collection = ?self.metadata(),
%recipe_id,
?profile_id,
"Deleting all requests for recipe+profile",
);
self.database
.connection()
.prepare(
"DELETE FROM requests_v2 WHERE collection_id = :collection_id
AND (:ignore_profile_id OR profile_id IS :profile_id)
AND recipe_id = :recipe_id
RETURNING id",
)
.and_then(|mut stmt| {
stmt.query_map(
named_params! {
":collection_id": self.collection_id,
":ignore_profile_id": profile_id == ProfileFilter::All,
":profile_id": profile_id,
":recipe_id": recipe_id,
},
|row| row.get::<_, RequestId>("id"),
)?
.collect::<rusqlite::Result<Vec<_>>>()
})
.map_err({
DatabaseError::add_context(format!(
"Deleting requests for recipe `{recipe_id}`"
))
})
.traced()
}
pub fn delete_request(
&self,
request_id: RequestId,
) -> Result<(), DatabaseError> {
info!(
collection = ?self.metadata(),
%request_id,
"Deleting request"
);
self.database
.connection()
.execute(
"DELETE FROM requests_v2 WHERE id = :request_id",
named_params! {":request_id": request_id},
)
.map_err({
DatabaseError::add_context(format!(
"Deleting request `{request_id}`"
))
})
.traced()?;
Ok(())
}
pub fn get_ui(
&self,
key_type: &str,
key: &str,
) -> Result<Option<String>, DatabaseError> {
let value = self
.database
.connection()
.query_row(
"SELECT value FROM ui_state_v2
WHERE collection_id = :collection_id
AND key_type = :key_type
AND key = :key",
named_params! {
":collection_id": self.collection_id,
":key_type": key_type,
":key": &key,
},
|row| row.get("value"),
)
.optional()
.map_err(DatabaseError::add_context(format!(
"Querying UI state key `{key:?}`"
)))
.traced()?;
Ok(value)
}
pub fn set_ui(&self, settings: &[UiSetting]) -> Result<(), DatabaseError> {
let mut conn = self.database.connection();
conn.transaction()
.and_then(|tx| {
let mut stmt = tx.prepare(
"INSERT INTO ui_state_v2
(collection_id, key_type, key, value)
VALUES (:collection_id, :key_type, :key, :value)
ON CONFLICT DO UPDATE SET value = excluded.value",
)?;
for setting in settings {
stmt.execute(named_params! {
":collection_id": self.collection_id,
":key_type": setting.key_type,
":key": setting.key,
":value": setting.value,
})?;
}
drop(stmt);
tx.commit()
})
.map_err(DatabaseError::add_context("Inserting UI state key"))
.traced()?;
Ok(())
}
pub fn insert_command(&self, command: &str) -> Result<(), DatabaseError> {
let deleted = (|| {
let connection = self.database.connection();
connection.execute(
"INSERT INTO commands (collection_id, command, time)
VALUES (:collection_id, :command, :time)
ON CONFLICT DO UPDATE SET time = :time",
named_params! {
":collection_id": self.collection_id,
":command": command,
":time": Utc::now(),
},
)?;
connection.execute(
"WITH to_delete AS
(SELECT command FROM commands WHERE
collection_id = :collection_id
ORDER BY time DESC
LIMIT -1 OFFSET :max_history_size)
DELETE FROM commands WHERE command IN to_delete",
named_params! {
":collection_id": self.collection_id,
":max_history_size": MAX_COMMAND_HISTORY_SIZE
},
)
})()
.map_err({
DatabaseError::add_context(format!(
"Inserting command `{command:?}`"
))
})
.traced()?;
if deleted > 0 {
debug!("Evicted {deleted} rows from `commands` table");
}
Ok(())
}
pub fn get_commands(
&self,
prefix: &str,
) -> Result<Vec<String>, DatabaseError> {
self.database
.connection()
.prepare(
"SELECT command FROM commands
WHERE collection_id = :collection_id
AND command LIKE :prefix || '%'
ORDER BY time DESC",
)?
.query_map(
named_params! {
":collection_id": self.collection_id,
":prefix": prefix,
},
|row| row.get("command"),
)
.map_err(DatabaseError::add_context(format!(
"Querying commands with prefix `{prefix}`",
)))
.and_then(|cursor| {
cursor.collect::<rusqlite::Result<Vec<_>>>().map_err(
DatabaseError::add_context("Extracting command history"),
)
})
.traced()
}
pub fn get_command(
&self,
offset: u32,
exclude: &str,
) -> Result<Option<String>, DatabaseError> {
self.database
.connection()
.query_row(
"SELECT command FROM commands
WHERE collection_id = :collection_id AND command != :exclude
ORDER BY time DESC
LIMIT 1 OFFSET :offset",
named_params! {
":collection_id": self.collection_id,
":offset": offset,
":exclude": exclude,
},
|row| row.get("command"),
)
.optional()
.map_err(DatabaseError::add_context("Querying commands"))
.traced()
}
pub fn collection_id(&self) -> CollectionId {
self.collection_id
}
}
#[derive(
Copy, Clone, Debug, derive_more::Display, derive_more::FromStr, PartialEq,
)]
#[cfg_attr(any(test, feature = "test"), derive(Eq, Hash))]
pub struct CollectionId(Uuid);
impl CollectionId {
fn new() -> Self {
Self(Uuid::new_v4())
}
}
#[derive(Clone, Debug)]
#[cfg_attr(any(test, feature = "test"), derive(PartialEq))]
pub struct CollectionMetadata {
pub id: CollectionId,
pub path: PathBuf,
pub name: Option<String>,
}
impl CollectionMetadata {
pub fn display_name(&self) -> String {
self.name
.clone()
.unwrap_or_else(|| self.path.display().to_string())
}
}
#[cfg(any(test, feature = "test"))]
impl slumber_util::Factory for Database {
fn factory((): ()) -> Self {
let mut connection = Connection::open_in_memory().unwrap();
Self::migrate(&mut connection).unwrap();
Self {
connection: Arc::new(Mutex::new(connection)),
}
}
}
#[cfg(any(test, feature = "test"))]
impl slumber_util::Factory for CollectionDatabase {
fn factory((): ()) -> Self {
use slumber_util::paths::get_repo_root;
Database::factory(())
.into_collection(
&CollectionFile::new(Some(get_repo_root().join("slumber.yml")))
.unwrap(),
)
.expect("Error initializing DB collection")
}
}
#[derive(Clone, Debug, Default, PartialEq)]
pub enum ProfileFilter<'a> {
None,
Some(Cow<'a, ProfileId>),
#[default]
All,
}
impl ProfileFilter<'_> {
pub fn matches(&self, profile_id: Option<&ProfileId>) -> bool {
match self {
Self::None => profile_id.is_none(),
Self::Some(expected) => profile_id == Some(expected),
Self::All => true,
}
}
pub fn into_owned(self) -> ProfileFilter<'static> {
match self {
Self::None => ProfileFilter::None,
Self::Some(profile_id) => {
ProfileFilter::Some(Cow::Owned(profile_id.into_owned()))
}
Self::All => ProfileFilter::All,
}
}
}
impl<'a> From<&'a ProfileId> for ProfileFilter<'a> {
fn from(profile_id: &'a ProfileId) -> Self {
Self::Some(Cow::Borrowed(profile_id))
}
}
impl<'a> From<Option<&'a ProfileId>> for ProfileFilter<'a> {
fn from(value: Option<&'a ProfileId>) -> Self {
match value {
Some(profile_id) => Self::Some(Cow::Borrowed(profile_id)),
None => Self::None,
}
}
}
impl From<ProfileId> for ProfileFilter<'static> {
fn from(profile_id: ProfileId) -> Self {
Self::Some(Cow::Owned(profile_id))
}
}
impl From<Option<ProfileId>> for ProfileFilter<'static> {
fn from(value: Option<ProfileId>) -> Self {
match value {
Some(profile_id) => Self::Some(Cow::Owned(profile_id)),
None => Self::None,
}
}
}
impl From<Option<Option<ProfileId>>> for ProfileFilter<'static> {
fn from(value: Option<Option<ProfileId>>) -> Self {
match value {
Some(Some(profile_id)) => Self::Some(Cow::Owned(profile_id)),
Some(None) => Self::None,
None => Self::All,
}
}
}
pub struct UiSetting {
pub key_type: &'static str,
pub key: String,
pub value: String,
}
#[derive(Debug, Error)]
pub enum DatabaseError {
#[error("{context}")]
Context { context: String, error: Box<Self> },
#[error("Error creating data directory")]
Directory(#[source] io::Error),
#[error(transparent)]
Migrate(rusqlite_migration::Error),
#[error("Getting collection path `{}`", path.display())]
Path {
path: PathBuf,
#[source]
error: io::Error,
},
#[error("Unknown {kind} `{id}`")]
ResourceUnknown { kind: &'static str, id: String },
#[error(transparent)]
Sqlite(#[from] rusqlite::Error),
}
impl DatabaseError {
pub fn add_context<Ctx, E>(context: Ctx) -> impl FnOnce(E) -> Self
where
Ctx: Into<String>,
E: Into<Self>,
{
move |error| Self::Context {
context: context.into(),
error: Box::new(error.into()),
}
}
pub fn with_context<Ctx, E>(error: E, context: Ctx) -> Self
where
Ctx: Into<String>,
E: Into<Self>,
{
Self::Context {
context: context.into(),
error: Box::new(error.into()),
}
}
}