#![allow(deprecated)]
mod builder;
pub use builder::Builder;
#[cfg(feature = "core")]
pub use libsql_sys::{Cipher, EncryptionConfig};
use crate::{Connection, Result};
#[cfg(any(feature = "remote", feature = "sync"))]
use base64::{engine::general_purpose, Engine};
use std::fmt;
use std::sync::atomic::AtomicU64;
cfg_core! {
bitflags::bitflags! {
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(C)]
pub struct OpenFlags: ::std::os::raw::c_int {
const SQLITE_OPEN_READ_ONLY = libsql_sys::ffi::SQLITE_OPEN_READONLY;
const SQLITE_OPEN_READ_WRITE = libsql_sys::ffi::SQLITE_OPEN_READWRITE;
const SQLITE_OPEN_CREATE = libsql_sys::ffi::SQLITE_OPEN_CREATE;
}
}
impl Default for OpenFlags {
#[inline]
fn default() -> OpenFlags {
OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
}
}
}
cfg_replication_or_sync! {
pub type FrameNo = u64;
#[derive(Debug)]
#[allow(dead_code)]
pub struct Replicated {
pub(crate) frame_no: Option<FrameNo>,
pub(crate) frames_synced: usize,
}
impl Replicated {
#[allow(dead_code)]
pub fn frame_no(&self) -> Option<FrameNo> {
self.frame_no
}
#[allow(dead_code)]
pub fn frames_synced(&self) -> usize {
self.frames_synced
}
}
}
cfg_sync! {
#[derive(Default)]
pub enum SyncProtocol {
#[default]
Auto,
V1,
V2,
}
}
enum DbType {
#[cfg(feature = "core")]
Memory { db: crate::local::Database },
#[cfg(feature = "core")]
File {
path: String,
flags: OpenFlags,
encryption_config: Option<EncryptionConfig>,
skip_safety_assert: bool,
},
#[cfg(feature = "replication")]
Sync {
db: crate::local::Database,
encryption_config: Option<EncryptionConfig>,
},
#[cfg(feature = "sync")]
Offline {
db: crate::local::Database,
remote_writes: bool,
read_your_writes: bool,
url: String,
auth_token: String,
connector: crate::util::ConnectorService,
_bg_abort: Option<std::sync::Arc<crate::sync::DropAbort>>,
remote_encryption: Option<EncryptionContext>,
},
#[cfg(feature = "remote")]
Remote {
url: String,
auth_token: String,
connector: crate::util::ConnectorService,
version: Option<String>,
namespace: Option<String>,
remote_encryption: Option<EncryptionContext>,
},
}
impl fmt::Debug for DbType {
#[allow(unreachable_patterns)]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
#[cfg(feature = "core")]
Self::Memory { .. } => write!(f, "Memory"),
#[cfg(feature = "core")]
Self::File { .. } => write!(f, "File"),
#[cfg(feature = "replication")]
Self::Sync { .. } => write!(f, "Sync"),
#[cfg(feature = "sync")]
Self::Offline { .. } => write!(f, "Offline"),
#[cfg(feature = "remote")]
Self::Remote { .. } => write!(f, "Remote"),
_ => write!(f, "no database type set"),
}
}
}
pub struct Database {
db_type: DbType,
#[allow(dead_code)]
max_write_replication_index: std::sync::Arc<AtomicU64>,
}
cfg_core! {
impl Database {
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub fn open_in_memory() -> Result<Self> {
let db = crate::local::Database::open(":memory:", OpenFlags::default())?;
Ok(Database {
db_type: DbType::Memory { db },
max_write_replication_index: Default::default(),
})
}
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub fn open(db_path: impl Into<String>) -> Result<Database> {
Database::open_with_flags(db_path, OpenFlags::default())
}
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub fn open_with_flags(db_path: impl Into<String>, flags: OpenFlags) -> Result<Database> {
Ok(Database {
db_type: DbType::File {
path: db_path.into(),
flags,
encryption_config: None,
skip_safety_assert: false,
},
max_write_replication_index: Default::default(),
})
}
}
}
cfg_replication! {
use crate::Error;
impl Database {
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub async fn open_with_local_sync(
db_path: impl Into<String>,
encryption_config: Option<EncryptionConfig>
) -> Result<Database> {
let db = crate::local::Database::open_local_sync(
db_path,
OpenFlags::default(),
encryption_config.clone()
).await?;
Ok(Database {
db_type: DbType::Sync { db, encryption_config },
max_write_replication_index: Default::default(),
})
}
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub async fn open_with_local_sync_remote_writes(
db_path: impl Into<String>,
endpoint: String,
auth_token: String,
encryption_config: Option<EncryptionConfig>,
) -> Result<Database> {
let https = connector()?;
Self::open_with_local_sync_remote_writes_connector(
db_path,
endpoint,
auth_token,
https,
encryption_config,
).await
}
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub async fn open_with_local_sync_remote_writes_connector<C>(
db_path: impl Into<String>,
endpoint: String,
auth_token: String,
connector: C,
encryption_config: Option<EncryptionConfig>,
) -> Result<Database>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
C::Response: crate::util::Socket,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
use tower::ServiceExt;
let svc = connector
.map_err(|e| e.into())
.map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
let svc = crate::util::ConnectorService::new(svc);
let db = crate::local::Database::open_local_sync_remote_writes(
svc,
db_path.into(),
endpoint,
auth_token,
None,
OpenFlags::default(),
encryption_config.clone(),
None,
None,
).await?;
Ok(Database {
db_type: DbType::Sync { db, encryption_config },
max_write_replication_index: Default::default(),
})
}
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub async fn open_with_remote_sync(
db_path: impl Into<String>,
url: impl Into<String>,
token: impl Into<String>,
encryption_config: Option<EncryptionConfig>,
) -> Result<Database> {
let https = connector()?;
Self::open_with_remote_sync_connector(db_path, url, token, https, false, encryption_config).await
}
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub async fn open_with_remote_sync_consistent(
db_path: impl Into<String>,
url: impl Into<String>,
token: impl Into<String>,
encryption_config: Option<EncryptionConfig>,
) -> Result<Database> {
let https = connector()?;
Self::open_with_remote_sync_connector(db_path, url, token, https, true, encryption_config).await
}
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub async fn open_with_remote_sync_connector<C>(
db_path: impl Into<String>,
url: impl Into<String>,
token: impl Into<String>,
connector: C,
read_your_writes: bool,
encryption_config: Option<EncryptionConfig>,
) -> Result<Database>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
C::Response: crate::util::Socket,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
Self::open_with_remote_sync_connector_internal(
db_path,
url,
token,
connector,
None,
read_your_writes,
encryption_config,
None
).await
}
#[doc(hidden)]
pub async fn open_with_remote_sync_internal(
db_path: impl Into<String>,
url: impl Into<String>,
token: impl Into<String>,
version: Option<String>,
read_your_writes: bool,
encryption_config: Option<EncryptionConfig>,
sync_interval: Option<std::time::Duration>,
) -> Result<Database> {
let https = connector()?;
Self::open_with_remote_sync_connector_internal(
db_path,
url,
token,
https,
version,
read_your_writes,
encryption_config,
sync_interval
).await
}
#[doc(hidden)]
async fn open_with_remote_sync_connector_internal<C>(
db_path: impl Into<String>,
url: impl Into<String>,
token: impl Into<String>,
connector: C,
version: Option<String>,
read_your_writes: bool,
encryption_config: Option<EncryptionConfig>,
sync_interval: Option<std::time::Duration>,
) -> Result<Database>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
C::Response: crate::util::Socket,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
use tower::ServiceExt;
let svc = connector
.map_err(|e| e.into())
.map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
let svc = crate::util::ConnectorService::new(svc);
let db = crate::local::Database::open_http_sync_internal(
svc,
db_path.into(),
url.into(),
token.into(),
version,
read_your_writes,
encryption_config.clone(),
sync_interval,
None,
None
).await?;
Ok(Database {
db_type: DbType::Sync { db, encryption_config },
max_write_replication_index: Default::default(),
})
}
pub async fn sync(&self) -> Result<Replicated> {
match &self.db_type {
#[cfg(feature = "replication")]
DbType::Sync { db, encryption_config: _ } => db.sync().await,
#[cfg(feature = "sync")]
DbType::Offline { db, remote_writes: false, .. } => db.sync_offline().await,
#[cfg(feature = "sync")]
DbType::Offline { db, remote_writes: true, .. } => {
let mut sync_ctx = db.sync_ctx.as_ref().unwrap().lock().await;
crate::sync::bootstrap_db(&mut sync_ctx).await?;
let conn = db.connect()?;
crate::sync::try_pull(&mut sync_ctx, &conn).await
},
_ => Err(Error::SyncNotSupported(format!("{:?}", self.db_type))),
}
}
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<Replicated> {
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
db.sync_until(replication_index).await
} else {
Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
}
}
pub async fn sync_frames(&self, frames: crate::replication::Frames) -> Result<Option<FrameNo>> {
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
db.sync_frames(frames).await
} else {
Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
}
}
pub async fn flush_replicator(&self) -> Result<Option<FrameNo>> {
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
db.flush_replicator().await
} else {
Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
}
}
pub async fn replication_index(&self) -> Result<Option<FrameNo>> {
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
db.replication_index().await
} else {
Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
}
}
pub fn freeze(self) -> Result<Database> {
match self.db_type {
DbType::Sync { db, .. } => {
let path = db.path().to_string();
Ok(Database {
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_safety_assert: false },
max_write_replication_index: Default::default(),
})
}
t => Err(Error::FreezeNotSupported(format!("{:?}", t)))
}
}
pub fn max_write_replication_index(&self) -> Option<FrameNo> {
let index = self
.max_write_replication_index
.load(std::sync::atomic::Ordering::SeqCst);
if index == 0 {
None
} else {
Some(index)
}
}
}
}
impl Database {}
cfg_remote! {
impl Database {
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> Result<Self> {
let https = connector()?;
Self::open_remote_with_connector_internal(url, auth_token, https, None)
}
#[doc(hidden)]
pub fn open_remote_internal(
url: impl Into<String>,
auth_token: impl Into<String>,
version: impl Into<String>,
) -> Result<Self> {
let https = connector()?;
Self::open_remote_with_connector_internal(url, auth_token, https, Some(version.into()))
}
#[deprecated = "Use the new `Builder` to construct `Database`"]
pub fn open_remote_with_connector<C>(
url: impl Into<String>,
auth_token: impl Into<String>,
connector: C,
) -> Result<Self>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
C::Response: crate::util::Socket,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
Self::open_remote_with_connector_internal(url, auth_token, connector, None)
}
#[doc(hidden)]
fn open_remote_with_connector_internal<C>(
url: impl Into<String>,
auth_token: impl Into<String>,
connector: C,
version: Option<String>,
) -> Result<Self>
where
C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
C::Response: crate::util::Socket,
C::Future: Send + 'static,
C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
{
use tower::ServiceExt;
let svc = connector
.map_err(|e| e.into())
.map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
Ok(Database {
db_type: DbType::Remote {
url: url.into(),
auth_token: auth_token.into(),
connector: crate::util::ConnectorService::new(svc),
version,
namespace: None,
remote_encryption: None
},
max_write_replication_index: Default::default(),
})
}
}
}
impl Database {
#[allow(unreachable_patterns)]
pub fn connect(&self) -> Result<Connection> {
match &self.db_type {
#[cfg(feature = "core")]
DbType::Memory { db } => {
use crate::local::impls::LibsqlConnection;
let conn = db.connect()?;
let conn = std::sync::Arc::new(LibsqlConnection { conn });
Ok(Connection { conn })
}
#[cfg(feature = "core")]
DbType::File {
path,
flags,
encryption_config,
skip_safety_assert,
} => {
use crate::local::impls::LibsqlConnection;
let db = if !skip_safety_assert {
crate::local::Database::open(path, *flags)?
} else {
unsafe { crate::local::Database::open_raw(path, *flags)? }
};
let conn = db.connect()?;
if !cfg!(feature = "encryption") && encryption_config.is_some() {
return Err(crate::Error::Misuse(
"Encryption is not enabled: enable the `encryption` feature in order to enable encryption-at-rest".to_string(),
));
}
#[cfg(feature = "encryption")]
if let Some(cfg) = encryption_config {
if unsafe {
libsql_sys::connection::set_encryption_cipher(conn.raw, cfg.cipher_id())
} == -1
{
return Err(crate::Error::Misuse(
"failed to set encryption cipher".to_string(),
));
}
if unsafe {
libsql_sys::connection::set_encryption_key(conn.raw, &cfg.encryption_key)
} != crate::ffi::SQLITE_OK
{
return Err(crate::Error::Misuse(
"failed to set encryption key".to_string(),
));
}
}
let conn = std::sync::Arc::new(LibsqlConnection { conn });
Ok(Connection { conn })
}
#[cfg(feature = "replication")]
DbType::Sync {
db,
encryption_config,
} => {
use crate::local::impls::LibsqlConnection;
let conn = db.connect()?;
if !cfg!(feature = "encryption") && encryption_config.is_some() {
return Err(crate::Error::Misuse(
"Encryption is not enabled: enable the `encryption` feature in order to enable encryption-at-rest".to_string(),
));
}
#[cfg(feature = "encryption")]
if let Some(cfg) = encryption_config {
if unsafe {
libsql_sys::connection::set_encryption_cipher(conn.raw, cfg.cipher_id())
} == -1
{
return Err(crate::Error::Misuse(
"failed to set encryption cipher".to_string(),
));
}
if unsafe {
libsql_sys::connection::set_encryption_key(conn.raw, &cfg.encryption_key)
} != crate::ffi::SQLITE_OK
{
return Err(crate::Error::Misuse(
"failed to set encryption key".to_string(),
));
}
}
let local = LibsqlConnection { conn };
let writer = local.conn.new_connection_writer();
let remote = crate::replication::RemoteConnection::new(
local,
writer,
self.max_write_replication_index.clone(),
);
let conn = std::sync::Arc::new(remote);
Ok(Connection { conn })
}
#[cfg(feature = "sync")]
DbType::Offline {
db,
remote_writes,
read_your_writes,
url,
auth_token,
connector,
remote_encryption,
..
} => {
use crate::{
hrana::connection::HttpConnection, local::impls::LibsqlConnection,
replication::connection::State, sync::connection::SyncedConnection,
};
use tokio::sync::Mutex;
tokio::task::block_in_place(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
db.bootstrap_db().await?;
Ok::<(), crate::Error>(())
})
})?;
let local = db.connect()?;
if *remote_writes {
let synced = SyncedConnection {
local,
remote: HttpConnection::new_with_connector(
url.clone(),
auth_token.clone(),
connector.clone(),
None,
None,
remote_encryption.clone(),
),
read_your_writes: *read_your_writes,
context: db.sync_ctx.clone().unwrap(),
state: std::sync::Arc::new(Mutex::new(State::Init)),
};
let conn = std::sync::Arc::new(synced);
return Ok(Connection { conn });
}
let conn = std::sync::Arc::new(LibsqlConnection { conn: local });
Ok(Connection { conn })
}
#[cfg(feature = "remote")]
DbType::Remote {
url,
auth_token,
connector,
version,
namespace,
remote_encryption,
} => {
let conn = std::sync::Arc::new(
crate::hrana::connection::HttpConnection::new_with_connector(
url,
auth_token,
connector.clone(),
version.as_ref().map(|s| s.as_str()),
namespace.as_ref().map(|s| s.as_str()),
remote_encryption.clone(),
),
);
Ok(Connection { conn })
}
_ => unreachable!("no database type set"),
}
}
}
#[cfg(any(
all(feature = "tls", feature = "replication"),
all(feature = "tls", feature = "remote"),
all(feature = "tls", feature = "sync")
))]
fn connector() -> Result<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>> {
let mut http = hyper::client::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(true);
Ok(hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots()
.map_err(crate::Error::InvalidTlsConfiguration)?
.https_or_http()
.enable_http1()
.wrap_connector(http))
}
#[cfg(any(
all(not(feature = "tls"), feature = "replication"),
all(not(feature = "tls"), feature = "remote"),
all(not(feature = "tls"), feature = "sync")
))]
fn connector() -> Result<hyper::client::HttpConnector> {
panic!("The `tls` feature is disabled, you must provide your own http connector");
}
impl std::fmt::Debug for Database {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Database").finish()
}
}
#[cfg(any(feature = "remote", feature = "sync"))]
#[derive(Debug, Clone)]
pub enum EncryptionKey {
Base64Encoded(String),
Bytes(Vec<u8>),
}
#[cfg(any(feature = "remote", feature = "sync"))]
impl EncryptionKey {
pub fn as_string(&self) -> String {
match self {
EncryptionKey::Base64Encoded(s) => s.clone(),
EncryptionKey::Bytes(b) => general_purpose::STANDARD.encode(b),
}
}
}
#[cfg(any(feature = "remote", feature = "sync"))]
#[derive(Debug, Clone)]
pub struct EncryptionContext {
pub key: EncryptionKey,
}