use log::{info, error};
use mediatype::MediaTypeBuf;
use mediatype::ReadParams;
use rusqlite;
use rusqlite::types::Type;
use rusqlite::Connection;
use rusqlite::Error;
use rusqlite::OpenFlags;
use rusqlite::params_from_iter;
use criterium::rusqlite::RusqliteQuery;
use criterium::rusqlite::AssembleRusqliteQuery;
use criterium::CriteriumChain;
use rusqlite::OptionalExtension;
use crate::database::BaseSchema;
use crate::database::DatabaseError;
use crate::database::DatabaseOpeningError;
use crate::database::Page;
use crate::database::id_cache::UrlCache;
use crate::Origin;
use crate::criterium::OriginCriterium;
use crate::mediatype::normalize_mimetype;
use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::backtrace::Backtrace;
use crate::database::error::IntoDatabaseErrorExtension;
use crate::database::id::OriginId;
use crate::database::id::UrlId;
use crate::url::UrlWithoutFragment;
use super::id::MimetypeId;
use super::id::NumericDatabseId;
type Query = RusqliteQuery<BaseSchema>;
pub struct Database {
pub connection: Connection,
url_cache: Arc<RwLock<UrlCache>>,
is_read_only: bool,
path: PathBuf,
is_new: bool,
}
pub struct DatabaseTransaction<'a> {
pub transaction: rusqlite::Transaction<'a>,
url_cache: Arc<RwLock<UrlCache>>,
is_read_only: bool,
}
impl Database {
pub fn open_writable(
path: impl AsRef<Path>
) -> Result<Self, DatabaseOpeningError> {
let path = path.as_ref();
let is_new = !path.exists();
let connection = rusqlite::Connection::open(path)
.error_while_opening(path)?;
if connection.is_readonly("main").unwrap_or(true) {
return Err(DatabaseOpeningError::DatabaseIsNotWritable {
path: path.into()
});
}
let db = Self {
connection: connection,
url_cache: Arc::new(UrlCache::new().into()),
is_read_only: false,
path: path.into(),
is_new,
};
db.connection.set_prepared_statement_cache_capacity(32);
db.check_base_schema()?;
Ok(db)
}
pub fn open_read_only(
path: impl AsRef<Path>
) -> Result<Self, DatabaseOpeningError> {
let path = path.as_ref();
if !path.is_file() {
return Err(DatabaseOpeningError::DatabaseFileNotFound {
path: path.into()
})
}
let db = Self {
connection: rusqlite::Connection::open_with_flags(
path,
OpenFlags::SQLITE_OPEN_READ_ONLY |
OpenFlags::SQLITE_OPEN_URI |
OpenFlags::SQLITE_OPEN_NO_MUTEX
).error_while_opening(path)?,
url_cache: Arc::new(UrlCache::new().into()),
is_read_only: true,
path: path.into(),
is_new: false, };
db.connection.set_prepared_statement_cache_capacity(32);
db.check_base_schema()?;
Ok(db)
}
pub fn optimize(&self) {
self.connection.execute("PRAGMA analysis_limit=400;",()).ok();
self.connection.execute("PRAGMA optimze;",()).ok();
self.connection.execute("vacuum;",()).ok();
}
pub fn connection(&self) -> &rusqlite::Connection {
&self.connection
}
pub fn path(&self) -> &Path {
self.path.as_path()
}
pub fn is_read_only(&self) -> bool {
self.is_read_only
}
pub fn is_new(&self) -> bool {
self.is_new
}
pub(super) fn assert_writable(&self, function: &str) -> Result<(), DatabaseError> {
if self.is_read_only {
error!("Attempt to write to read only database in function {function}!");
error!("Backtrace: {}", Backtrace::capture());
return Err(DatabaseError::AttemptToWriteToReadOnlyDatabse {
function: function.to_owned(),
})
}
Ok(())
}
pub fn initalize_base_database(&self) -> Result<(), DatabaseError> {
self.assert_writable("initalize_base_database")?;
info!("Initalizing database (base part) ...");
info!("Table: unobtanium_database_info ...");
self.connection().execute("
CREATE TABLE IF NOT EXISTS unobtanium_database_info (
key TEXT NOT NULL PRIMARY KEY,
value TEST NOT NULL
);"
,())?;
info!("Table: origin ...");
self.connection().execute("
CREATE TABLE IF NOT EXISTS origin (
origin_id INTEGER NOT NULL PRIMARY KEY,
port INTEGER NULL,
scheme VARCHAR(7) NOT NULL,
host VARCHAR(255) NULL,
str_origin TEXT UNIQUE NOT NULL
);"
,())?;
info!("Index: origin_by_host on origin ...");
self.connection.execute(
"CREATE INDEX IF NOT EXISTS origin_by_host ON origin(host,origin_id);"
,())?;
info!("Table: url ...");
self.connection().execute("
CREATE TABLE IF NOT EXISTS url (
url_id INTEGER NOT NULL PRIMARY KEY,
origin_id INTEGER NOT NULL,
path TEXT NULL,
query TEXT NULL,
username TEXT NULL,
password TEXT NULL,
str_url TEXT UNIQUE NOT NULL
);"
,())?;
info!("Index: url_string on url ...");
self.connection().execute(
"CREATE INDEX IF NOT EXISTS url_string ON url(str_url,url_id);"
,())?;
info!("Table: mimetype ...");
self.connection().execute("
CREATE TABLE IF NOT EXISTS mimetype (
mimetype_id INTEGER NOT NULL PRIMARY KEY,
mime_type TEXT NOT NULL,
mime_subtype TEXT NOT NULL,
mime_suffix TEXT NULL,
charset TEXT NULL,
str_mimetype TEXT UNIQUE NOT NULL
);"
,())?;
info!("Table: mime_parameter ...");
self.connection().execute("
CREATE TABLE IF NOT EXISTS mime_parameter (
mime_parameter_id INTEGER NOT NULL PRIMARY KEY,
mimetype_id INTEGER NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL
);"
,())?;
info!("Index: mimetype_string on mimetype ...");
self.connection().execute(
"CREATE INDEX IF NOT EXISTS mimetype_string
ON mimetype(str_mimetype,mimetype_id);"
,())?;
info!("Setting base schema version ...");
self.set_database_info("unobtanium_base_schema_version", Some("0.1.0"))?;
info!("Database (base part) successfully initialized!");
Ok(())
}
pub fn check_base_schema(&self) -> Result<(), DatabaseOpeningError> {
if self.is_new {
return Ok(());
}
if let Some(version) = self.fetch_database_info("unobtanium_base_schema_version")
.map_err(|e| e.while_initlizing(self.path.clone()))?
{
match version.as_str() {
"0.1.0" => Ok(()),
_ => Err(DatabaseOpeningError::WrongSchemaVersion{
path: self.path.clone(),
schema: "base".to_string(),
got_version: version,
expected_version: "0.1.0".to_string(),
})
}
} else {
Err(DatabaseOpeningError::DatabaseUnversioned {
path: self.path.clone()
})
}
}
pub fn start_transaction(&mut self) -> Result<DatabaseTransaction, DatabaseError> {
return Ok(DatabaseTransaction {
transaction: self.connection.transaction()?,
url_cache: self.url_cache.clone(),
is_read_only: self.is_read_only
});
}
pub fn set_database_info(
&self,
key: &str,
value: Option<&str>
) -> Result<(), DatabaseError> {
if let Some(value) = value {
self.connection.execute(
"INSERT OR REPLACE INTO unobtanium_database_info (key, value) VALUES (?,?)",
(key, value)
)?;
} else {
self.connection().execute(
"DELETE FROM unobtanium_database_info WHERE key = ?",
(key,)
)?;
}
Ok(())
}
pub fn fetch_database_info(
&self,
key: &str
) -> Result<Option<String>, DatabaseError> {
self.connection().query_row(
"SELECT value
FROM unobtanium_database_info
WHERE key = ?
LIMIT 1", (key,),
|row| {
row.get(0)
}
).optional().map_err(Into::into)
}
pub fn read_origin_id(&self, origin: &Origin) -> Result<OriginId, DatabaseError> {
let mut query_statement = self.connection.prepare_cached(
"SELECT origin_id FROM origin WHERE scheme = ? AND host is ? AND port is ?"
)?;
query_statement.query_row(
(&origin.scheme, &origin.host, origin.port),
|row| row.get(0)
).map_err(Into::into)
}
pub fn get_origin_by_id(&self, origin_id: OriginId) -> Result<Origin, DatabaseError> {
return self.connection.query_row("SELECT scheme,host,port FROM origin WHERE origin_id = ?", (origin_id,), |row| Ok(Origin {
scheme: row.get(0)?,
host: row.get(1)?,
port: row.get(2)?
})).map_err(Into::into);
}
pub fn list_origins(
&self,
scheme: Option<String>,
host: Option<String>,
port: Option<u16>,
page: &Page,
) -> Result<Vec<Origin>, DatabaseError> {
let mut conditions = "".to_string();
match &scheme {
Some(_) => conditions+="scheme = ?",
None => conditions+="? is NULL",
};
conditions += " AND ";
match &host {
Some(_) => conditions+="host = ?",
None => conditions+="? is NULL",
};
conditions += " AND ";
match &port {
Some(_) => conditions+="port = ?",
None => conditions+="? is NULL",
};
let mut get_known_protocols_statement = self.connection.prepare(("
SELECT scheme,host,port FROM origin
WHERE ".to_owned()+&conditions+"
LIMIT ? OFFSET ?
").as_str())?;
return get_known_protocols_statement.query_map(
(scheme, host, port, page.limit(), page.offset()),
|row| Ok(Origin {
scheme: row.get(0)?,
host: row.get(1)?,
port: row.get(2)?
})
)?.map(|r| r.map_err(Into::into)).collect();
}
pub fn query_origins(
&self,
criteria: CriteriumChain<OriginCriterium>,
page: &Page,
) -> Result<Vec<Origin>, DatabaseError> {
let mut assembly: Query = criteria.assemble_rusqlite_query_for_db(&());
assembly.where_values.push(page.limit().into());
assembly.where_values.push(page.offset().into());
let mut get_known_protocols_statement = self.connection.prepare(("
SELECT scheme,host,port FROM origin
WHERE ".to_owned()+&assembly.sql_where_clause+"
LIMIT ? OFFSET ?
").as_str())?;
return get_known_protocols_statement.query_map(
params_from_iter(assembly.where_values.iter()),
|row| Ok(Origin {
scheme: row.get(0)?,
host: row.get(1)?,
port: row.get(2)?
})
)?.map(|r| r.map_err(Into::into)).collect();
}
pub fn get_known_schemes(
&self,
page: &Page,
) -> Result<Vec<String>, DatabaseError> {
let mut get_known_protocols_statement = self.connection.prepare("
SELECT DISTINCT scheme FROM origin LIMIT ? OFFSET ?
")?;
return get_known_protocols_statement.query_map(
(page.limit(), page.offset()),
|row| row.get::<usize,String>(1),
)?.map(|r| r.map_err(Into::into)).collect();
}
pub fn get_known_hosts(
&self,
page: &Page,
) -> Result<Vec<String>, DatabaseError> {
let mut get_known_protocols_statement = self.connection.prepare("
SELECT DISTINCT host origin LIMIT ? OFFSET ?
")?;
return get_known_protocols_statement.query_map(
(page.limit(), page.offset()),
|row| row.get::<usize,String>(1),
)?.map(|r| r.map_err(Into::into)).collect();
}
pub fn get_known_ports(
&self,
page: &Page,
) -> Result<Vec<u16>, DatabaseError> {
let mut get_known_protocols_statement = self.connection.prepare("
SELECT DISTINCT port FROM origin LIMIT ? OFFSET ?
")?;
return get_known_protocols_statement.query_map(
(page.limit(), page.offset()),
|row| row.get::<usize,u16>(1),
)?.map(|r| r.map_err(Into::into)).collect();
}
pub fn get_or_add_url_id(
&mut self,
url: &UrlWithoutFragment
) -> Result<UrlId, DatabaseError> {
self.assert_writable("get_or_add_url_id")?;
if let Some(id) = self.url_cache.read().unwrap().get_id(url) {
return Ok(id);
}
let transaction = self.start_transaction()?;
let url_id = transaction.get_url_id(url, true)?;
transaction.commit()?;
return Ok(url_id);
}
pub fn read_url_id(
&self,
url: &UrlWithoutFragment,
) -> Result<UrlId, DatabaseError> {
if let Some(id) = self.url_cache.read().unwrap().get_id(url) {
return Ok(id);
}
let mut select_statement = self.connection.prepare_cached(
"SELECT url_id FROM url WHERE str_url = ?",
)?;
let url_id = select_statement.query_row(
(&url,),
|row| row.get(0)
)?;
self.url_cache.write().unwrap().push(url_id, url);
return Ok(url_id);
}
pub fn get_url_by_id(
&self,
url_id: UrlId
) -> Result<UrlWithoutFragment, DatabaseError> {
if let Some(url) = self.url_cache.read().unwrap().get_url(url_id) {
return Ok(url);
}
let mut statement = self.connection.prepare_cached(
"SELECT str_url
FROM url
WHERE url_id = ?"
)?;
let url = statement.query_row((url_id,), |row|
row.get(0)
)?;
self.url_cache.write().unwrap().push(url_id, &url);
return Ok(url);
}
pub fn get_url_by_id_bulk(
&self,
url_ids: &Vec<UrlId>
) -> Result<HashMap<UrlId,UrlWithoutFragment>, DatabaseError> {
let mut statement = self.connection.prepare_cached(
"SELECT str_url
FROM url
WHERE url_id = ?"
)?;
let mut url_cache = self.url_cache.write().unwrap();
let mut results = HashMap::with_capacity(url_ids.len());
for url_id in url_ids {
if let Some(url) = url_cache.get_url(*url_id) {
results.insert(*url_id, url);
} else {
let url = statement.query_row((url_id,), |row|
row.get(0)
)?;
url_cache.push(*url_id, &url);
results.insert(*url_id, url);
}
}
return Ok(results);
}
pub fn get_mimetype_by_id(
&self, mimetype_id: MimetypeId
) -> Result<MediaTypeBuf, DatabaseError> {
let mut statement = self.connection.prepare_cached(
"SELECT str_mimetype
FROM mimetype
WHERE mimetype_id = ?"
)?;
let mimetype_string: String = statement.query_row((mimetype_id,), |row| {
row.get(0)
})?;
MediaTypeBuf::from_string(
mimetype_string.clone()
).map_err(|e| Error::FromSqlConversionFailure(
0, Type::Text, e.into()
).into())
}
}
impl DatabaseTransaction<'_> {
pub fn commit(self) -> Result<(), DatabaseError> {
self.transaction.commit()?;
Ok(())
}
pub fn is_read_only(&self) -> bool {
self.is_read_only
}
pub(super) fn assert_writable(&self, function: &str) -> Result<(), DatabaseError> {
if self.is_read_only {
error!("Attempt to write to read only database in function {function}!");
error!("Backtrace: {}", Backtrace::capture());
return Err(DatabaseError::AttemptToWriteToReadOnlyDatabse {
function: function.to_owned(),
})
}
Ok(())
}
pub fn get_origin_id(
&self,
origin: &Origin,
autoadd: bool
) -> Result<OriginId, DatabaseError> {
if autoadd {
self.assert_writable("get_origin_id(autoadd=true)")?;
}
let mut query_statement = self.transaction.prepare_cached(
"SELECT origin_id FROM origin WHERE scheme = ? AND host is ? AND port is ?"
)?;
match query_statement.query_row(
(&origin.scheme, &origin.host, origin.port),
|row| row.get(0)
) {
Ok(res) => { return Ok(res); }
Err(e) => {
if !autoadd {
return Err(e.into());
}
}
}
let query = "INSERT INTO origin (scheme, host, port, str_origin) VALUES (?,?,?,?)";
let mut insert_statement = self.transaction.prepare_cached(query)?;
insert_statement.execute((
&origin.scheme,
&origin.host,
origin.port,
origin.to_string()
))?;
return Ok(OriginId::new(self.transaction.last_insert_rowid()));
}
pub fn read_url_id(
&self,
url: &UrlWithoutFragment,
) -> Result<UrlId, Error> {
if let Some(id) = self.url_cache.read().unwrap().get_id(url) {
return Ok(id);
}
let mut select_statement = self.transaction.prepare_cached(
"SELECT url_id FROM url WHERE str_url = ?",
)?;
let url_id = select_statement.query_row(
(&url,),
|row| row.get(0)
)?;
self.url_cache.write().unwrap().push(url_id, url);
return Ok(url_id);
}
pub fn get_url_id(
&self,
url: &UrlWithoutFragment,
autoadd: bool
) -> Result<UrlId, DatabaseError> {
if autoadd {
self.assert_writable("get_origin_id(autoadd=true)")?;
}
if let Some(id) = self.url_cache.read().unwrap().get_id(url) {
return Ok(id);
}
let mut select_statement = self.transaction.prepare_cached(
"SELECT url_id FROM url WHERE str_url = ?",
)?;
match select_statement.query_row(
(&url,),
|row| row.get(0)
) {
Ok(url_id) => {
self.url_cache.write().unwrap().push(url_id, url);
return Ok(url_id);
}
Err(e) => {
if !autoadd {
return Err(e.into());
}
}
}
let origin_id = self.get_origin_id(
&Origin::from_url(url).unwrap_or(Origin{
scheme: "".to_string(),
host: None,
port: None,
}),
autoadd
)?;
let mut insert_statement = self.transaction.prepare_cached(
"INSERT INTO url (origin_id, path, query, username, password, str_url) VALUES (?,?,?,?,?,?)"
)?;
insert_statement.execute((
origin_id,
&url.path(),
&url.query(),
url.username(),
url.password(),
&url
))?;
let url_id = UrlId::new(self.transaction.last_insert_rowid());
self.url_cache.write().unwrap().push(url_id, url);
return Ok(url_id);
}
pub fn get_mimetype_id(
&mut self,
mimetype: &MediaTypeBuf,
autoadd: bool
) -> Result<MimetypeId, DatabaseError> {
if autoadd {
self.assert_writable("get_mimetype_id(autoadd=true)")?;
}
let mimetype = normalize_mimetype(mimetype);
let string_mediatype = mimetype.to_string();
let mut select_statement = self.transaction.prepare_cached(
"SELECT mimetype_id FROM mimetype WHERE str_mimetype = ?",
)?;
match select_statement.query_row(
(&string_mediatype,),
|row| row.get(0)
) {
Ok(res) => { return Ok(res); }
Err(e) => {
if !autoadd {
return Err(e.into());
}
}
}
self.transaction.execute(
"INSERT INTO mimetype (
mime_type,
mime_subtype,
mime_suffix,
charset,
str_mimetype
) VALUES (
?,?,?,?,?
)",(
mimetype.ty().to_string(),
mimetype.subty().to_string(),
mimetype.suffix().map(|s| s.to_string()),
mimetype.get_param(
mediatype::Name::new("charset").expect("Static Mime-Parameter name")
).map(|s| s.to_string()),
string_mediatype,
)
)?;
let mimetype_id = MimetypeId::new(self.transaction.last_insert_rowid());
let mut insert_mime_params_statement = self.transaction.prepare(
"INSERT INTO mime_parameter (
mimetype_id,
key,
value
) VALUES (?,?,?)
")?;
for p in mimetype.params() {
if p.0 != "charset" {
insert_mime_params_statement.execute((
mimetype_id,
p.0.as_str(),
p.1.as_str()
))?;
}
}
return Ok(mimetype_id);
}
}