#![deny(
bad_style,
bare_trait_objects,
const_err,
dead_code,
improper_ctypes,
legacy_directory_ownership,
missing_debug_implementations,
missing_docs,
no_mangle_generic_items,
non_shorthand_field_patterns,
overflowing_literals,
path_statements,
patterns_in_fns_without_body,
plugin_as_library,
private_in_public,
safe_extern_statics,
trivial_numeric_casts,
unconditional_recursion,
unions_with_drop_fields,
unsafe_code,
unused,
unused_allocation,
unused_comparisons,
unused_extern_crates,
unused_import_braces,
unused_parens,
unused_qualifications,
unused_results,
while_true
)]
mod cmd;
mod run;
use crate::cmd::Command;
use futures::{executor::block_on, prelude::*};
use g1_common::{
nameless::NamelessQuery, utils::file_to_stream, Atom, Bytes, Connection, Hash, Mime,
};
use sha2::{Digest, Sha256};
use std::{
os::unix::ffi::OsStrExt,
path::PathBuf,
pin::Pin,
sync::Arc,
thread::{spawn, JoinHandle},
};
use thiserror::Error;
use tokio::{
fs::{create_dir_all, rename, File},
prelude::*,
sync::{
mpsc::{channel, Sender},
oneshot, Mutex,
},
task::spawn_blocking,
};
use uuid::Uuid;
#[derive(Debug)]
pub struct SqliteConnection {
join: JoinHandle<()>,
path: PathBuf,
send: Mutex<Sender<Command>>,
}
const INITDB: &str = r#"
create table if not exists atoms
( atom text not null
, constraint atomUnique unique (atom)
);
create table if not exists names
( atom text not null
, ns text not null
, title text not null
, constraint nameAtom foreign key (atom) references atoms(atom)
, constraint nameUnique unique (ns, title)
);
create table if not exists edges
( edge_from text not null
, edge_to text not null
, label text not null
, constraint edgeFromAtom foreign key (edge_from) references atoms(atom)
, constraint edgeToAtom foreign key (edge_to) references atoms(atom)
, constraint edgeUnique unique (edge_from, edge_to, label)
);
create table if not exists tags
( atom text not null
, key text not null
, value text not null
, constraint tagAtom foreign key (atom) references atoms(atom)
, constraint tagUnique unique (atom, key)
);
create table if not exists blobs
( atom text not null
, kind text not null
, mime text not null
, hash text not null
, constraint blobAtom foreign key (atom) references atoms(atom)
, constraint blobUnique unique (atom, kind, mime)
);"#;
impl SqliteConnection {
pub async fn open(path: PathBuf) -> Result<SqliteConnection, SqliteError> {
create_dir_all(path.join("blobs")).await?;
create_dir_all(path.join("tmp")).await?;
let mut conn_path = path.clone();
conn_path.push("g1.db");
let conn = spawn_blocking(move || -> rusqlite::Result<rusqlite::Connection> {
let conn = rusqlite::Connection::open(conn_path)?;
conn.pragma_update(None, "foreign_keys", &true)?;
conn.execute_batch(INITDB)?;
Ok(conn)
})
.await
.map_err(tokio::io::Error::from)??;
let (send, mut recv) = channel::<Command>(1);
let join = spawn(move || {
let mut conn = conn;
while let Some(cmd) = block_on(recv.recv()) {
cmd.run(&mut conn);
}
for _ in 0..3 {
match conn.close() {
Ok(()) => break,
Err((c, err)) => {
conn = c;
log::error!("Failed to close SQLite: {}", err);
}
}
}
});
Ok(SqliteConnection {
join,
path,
send: Mutex::new(send),
})
}
async fn send_command<F, T>(&self, make_command: F) -> Result<T, SqliteError>
where
F: FnOnce(oneshot::Sender<Result<T, SqliteError>>) -> Command,
{
let (send, recv) = oneshot::channel();
let mut send_send = self.send.lock().await;
send_send
.send(make_command(send))
.await
.map_err(|_| SqliteError::SQLitePanic)?;
recv.await.map_err(|_| SqliteError::SQLitePanic)?
}
}
#[async_trait::async_trait]
impl Connection for SqliteConnection {
type Error = SqliteError;
async fn create_atom(&self) -> Result<Atom, Self::Error> {
self.send_command(|send| Command::CreateAtom(send)).await
}
async fn delete_atom(&self, atom: Atom) -> Result<(), Self::Error> {
self.send_command(move |send| Command::DeleteAtom(atom, send))
.await
}
async fn create_name(
&self,
atom: Atom,
ns: &str,
title: &str,
upsert: bool,
) -> Result<(), Self::Error> {
self.send_command(move |send| {
Command::CreateName(atom, ns.to_string(), title.to_string(), upsert, send)
})
.await
}
async fn delete_name(&self, ns: &str, title: &str) -> Result<bool, Self::Error> {
self.send_command(move |send| Command::DeleteName(ns.to_string(), title.to_string(), send))
.await
}
async fn create_edge(&self, from: Atom, to: Atom, label: &str) -> Result<bool, Self::Error> {
self.send_command(move |send| Command::CreateEdge(from, to, label.to_string(), send))
.await
}
async fn delete_edge(&self, from: Atom, to: Atom, label: &str) -> Result<bool, Self::Error> {
self.send_command(move |send| Command::DeleteEdge(from, to, label.to_string(), send))
.await
}
async fn create_tag(
&self,
atom: Atom,
key: &str,
value: &str,
upsert: bool,
) -> Result<(), Self::Error> {
self.send_command(move |send| {
Command::CreateTag(atom, key.to_string(), value.to_string(), upsert, send)
})
.await
}
async fn delete_tag(&self, atom: Atom, key: &str) -> Result<bool, Self::Error> {
self.send_command(move |send| Command::DeleteTag(atom, key.to_string(), send))
.await
}
async fn create_blob(
&self,
atom: Atom,
kind: &str,
mime: Mime,
hash: Hash,
upsert: bool,
) -> Result<(), Self::Error> {
self.send_command(move |send| {
Command::CreateBlob(atom, kind.to_string(), mime, hash, upsert, send)
})
.await
}
async fn delete_blob(&self, atom: Atom, kind: &str, mime: Mime) -> Result<bool, Self::Error> {
self.send_command(move |send| Command::DeleteBlob(atom, kind.to_string(), mime, send))
.await
}
async fn fetch_blob(
&self,
hash: Hash,
) -> Result<Pin<Box<dyn Stream<Item = Result<Bytes, Self::Error>> + Send>>, Self::Error> {
let mut path = self.path.clone();
path.push("blobs");
path.push(hash.to_string());
Ok(file_to_stream(path).await?.map_err(|e| e.into()).boxed())
}
async fn store_blob(
&self,
mut data: Pin<Box<dyn Stream<Item = Result<Bytes, Self::Error>> + Send + 'static>>,
) -> Result<Hash, Self::Error> {
let mut tmp_path = self.path.clone();
tmp_path.push("tmp");
tmp_path.push(Uuid::new_v4().to_string());
let mut file = File::create(&tmp_path).await?;
let mut hasher = Sha256::new();
while let Some(r) = data.next().await {
let chunk = r?;
hasher.input(&chunk);
let _ = file.write(&chunk).await?;
}
file.sync_all().await?;
let hash = Hash::from_bytes(hasher.result().as_slice());
let mut path = self.path.clone();
path.push("blobs");
path.push(hash.to_string());
rename(tmp_path, &path).await?;
let _ = path.pop();
spawn_blocking(move || {
let path = path.as_os_str().as_bytes().as_ptr() as *const libc::c_char;
#[allow(unsafe_code)]
unsafe {
let errno = libc::__errno_location();
let fd = libc::open(path, libc::O_RDONLY);
if fd == -1 {
return Err(*errno);
}
if libc::fsync(fd) != 0 {
let _ = libc::close(fd);
return Err(*errno);
}
if libc::close(fd) != 0 {
return Err(*errno);
}
}
Ok(())
})
.await
.map_err(tokio::io::Error::from)?
.map_err(std::io::Error::from_raw_os_error)?;
Ok(hash)
}
async fn query(
&self,
limit: Option<usize>,
query: &NamelessQuery,
) -> Result<Vec<Vec<Arc<str>>>, Self::Error> {
self.send_command(move |send| Command::Query(limit, query.clone(), send))
.await
}
}
#[derive(Debug, Error)]
pub enum SqliteError {
#[error("IO error: {0}")]
IO(#[from] tokio::io::Error),
#[error("Invalid query: {0}")]
InvalidQuery(String),
#[error("SQLite error: {0}")]
SQLite(#[from] rusqlite::Error),
#[error("The SQLite thread panicked")]
SQLitePanic,
}
impl g1_common::Error for SqliteError {
fn invalid_query(msg: String) -> SqliteError {
SqliteError::InvalidQuery(msg)
}
}