1#![allow(deprecated)]
2
3mod builder;
4
5pub use builder::Builder;
6
7#[cfg(feature = "core")]
8pub use libsql_sys::{Cipher, EncryptionConfig};
9
10use crate::{Connection, Result};
11use std::fmt;
12use std::sync::atomic::AtomicU64;
13
14cfg_core! {
15 bitflags::bitflags! {
16 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
19 #[repr(C)]
20 pub struct OpenFlags: ::std::os::raw::c_int {
21 const SQLITE_OPEN_READ_ONLY = libsql_sys::ffi::SQLITE_OPEN_READONLY;
22 const SQLITE_OPEN_READ_WRITE = libsql_sys::ffi::SQLITE_OPEN_READWRITE;
23 const SQLITE_OPEN_CREATE = libsql_sys::ffi::SQLITE_OPEN_CREATE;
24 }
25 }
26
27 impl Default for OpenFlags {
28 #[inline]
29 fn default() -> OpenFlags {
30 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
31 }
32 }
33}
34
35cfg_replication_or_sync! {
36
37 pub type FrameNo = u64;
38
39 #[derive(Debug)]
40 #[allow(dead_code)]
42 pub struct Replicated {
43 pub(crate) frame_no: Option<FrameNo>,
44 pub(crate) frames_synced: usize,
45 }
46
47 impl Replicated {
48 #[allow(dead_code)]
54 pub fn frame_no(&self) -> Option<FrameNo> {
55 self.frame_no
56 }
57
58 #[allow(dead_code)]
62 pub fn frames_synced(&self) -> usize {
63 self.frames_synced
64 }
65 }
66
67}
68
69cfg_sync! {
70 #[derive(Default)]
71 pub enum SyncProtocol {
72 #[default]
73 Auto,
74 V1,
75 V2,
76 }
77}
78
79enum DbType {
80 #[cfg(feature = "core")]
81 Memory { db: crate::local::Database },
82 #[cfg(feature = "core")]
83 File {
84 path: String,
85 flags: OpenFlags,
86 encryption_config: Option<EncryptionConfig>,
87 skip_saftey_assert: bool,
88 },
89 #[cfg(feature = "replication")]
90 Sync {
91 db: crate::local::Database,
92 encryption_config: Option<EncryptionConfig>,
93 },
94 #[cfg(feature = "sync")]
95 Offline {
96 db: crate::local::Database,
97 remote_writes: bool,
98 read_your_writes: bool,
99 url: String,
100 auth_token: String,
101 connector: crate::util::ConnectorService,
102 _bg_abort: Option<std::sync::Arc<crate::sync::DropAbort>>,
103 },
104 #[cfg(feature = "remote")]
105 Remote {
106 url: String,
107 auth_token: String,
108 connector: crate::util::ConnectorService,
109 version: Option<String>,
110 namespace: Option<String>,
111 },
112}
113
114impl fmt::Debug for DbType {
115 #[allow(unreachable_patterns)]
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 match self {
118 #[cfg(feature = "core")]
119 Self::Memory { .. } => write!(f, "Memory"),
120 #[cfg(feature = "core")]
121 Self::File { .. } => write!(f, "File"),
122 #[cfg(feature = "replication")]
123 Self::Sync { .. } => write!(f, "Sync"),
124 #[cfg(feature = "sync")]
125 Self::Offline { .. } => write!(f, "Offline"),
126 #[cfg(feature = "remote")]
127 Self::Remote { .. } => write!(f, "Remote"),
128 _ => write!(f, "no database type set"),
129 }
130 }
131}
132
133pub struct Database {
136 db_type: DbType,
137 #[allow(dead_code)]
139 max_write_replication_index: std::sync::Arc<AtomicU64>,
140}
141
142cfg_core! {
143 impl Database {
144 #[deprecated = "Use the new `Builder` to construct `Database`"]
146 pub fn open_in_memory() -> Result<Self> {
147 let db = crate::local::Database::open(":memory:", OpenFlags::default())?;
148
149 Ok(Database {
150 db_type: DbType::Memory { db },
151 max_write_replication_index: Default::default(),
152 })
153 }
154
155 #[deprecated = "Use the new `Builder` to construct `Database`"]
157 pub fn open(db_path: impl Into<String>) -> Result<Database> {
158 Database::open_with_flags(db_path, OpenFlags::default())
159 }
160
161 #[deprecated = "Use the new `Builder` to construct `Database`"]
163 pub fn open_with_flags(db_path: impl Into<String>, flags: OpenFlags) -> Result<Database> {
164 Ok(Database {
165 db_type: DbType::File {
166 path: db_path.into(),
167 flags,
168 encryption_config: None,
169 skip_saftey_assert: false,
170 },
171 max_write_replication_index: Default::default(),
172 })
173 }
174 }
175}
176
177cfg_replication! {
178 use crate::Error;
179
180
181 impl Database {
182 #[deprecated = "Use the new `Builder` to construct `Database`"]
184 pub async fn open_with_local_sync(
185 db_path: impl Into<String>,
186 encryption_config: Option<EncryptionConfig>
187 ) -> Result<Database> {
188 let db = crate::local::Database::open_local_sync(
189 db_path,
190 OpenFlags::default(),
191 encryption_config.clone()
192 ).await?;
193
194 Ok(Database {
195 db_type: DbType::Sync { db, encryption_config },
196 max_write_replication_index: Default::default(),
197 })
198 }
199
200
201 #[deprecated = "Use the new `Builder` to construct `Database`"]
204 pub async fn open_with_local_sync_remote_writes(
205 db_path: impl Into<String>,
206 endpoint: String,
207 auth_token: String,
208 encryption_config: Option<EncryptionConfig>,
209 ) -> Result<Database> {
210 let https = connector()?;
211
212 Self::open_with_local_sync_remote_writes_connector(
213 db_path,
214 endpoint,
215 auth_token,
216 https,
217 encryption_config
218 ).await
219 }
220
221 #[deprecated = "Use the new `Builder` to construct `Database`"]
224 pub async fn open_with_local_sync_remote_writes_connector<C>(
225 db_path: impl Into<String>,
226 endpoint: String,
227 auth_token: String,
228 connector: C,
229 encryption_config: Option<EncryptionConfig>,
230 ) -> Result<Database>
231 where
232 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
233 C::Response: crate::util::Socket,
234 C::Future: Send + 'static,
235 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
236 {
237 use tower::ServiceExt;
238
239 let svc = connector
240 .map_err(|e| e.into())
241 .map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
242
243 let svc = crate::util::ConnectorService::new(svc);
244
245 let db = crate::local::Database::open_local_sync_remote_writes(
246 svc,
247 db_path.into(),
248 endpoint,
249 auth_token,
250 None,
251 OpenFlags::default(),
252 encryption_config.clone(),
253 None,
254 None,
255 ).await?;
256
257 Ok(Database {
258 db_type: DbType::Sync { db, encryption_config },
259 max_write_replication_index: Default::default(),
260 })
261 }
262
263 #[deprecated = "Use the new `Builder` to construct `Database`"]
265 pub async fn open_with_remote_sync(
266 db_path: impl Into<String>,
267 url: impl Into<String>,
268 token: impl Into<String>,
269 encryption_config: Option<EncryptionConfig>,
270 ) -> Result<Database> {
271 let https = connector()?;
272
273 Self::open_with_remote_sync_connector(db_path, url, token, https, false, encryption_config).await
274 }
275
276 #[deprecated = "Use the new `Builder` to construct `Database`"]
282 pub async fn open_with_remote_sync_consistent(
283 db_path: impl Into<String>,
284 url: impl Into<String>,
285 token: impl Into<String>,
286 encryption_config: Option<EncryptionConfig>,
287 ) -> Result<Database> {
288 let https = connector()?;
289
290 Self::open_with_remote_sync_connector(db_path, url, token, https, true, encryption_config).await
291 }
292
293 #[deprecated = "Use the new `Builder` to construct `Database`"]
296 pub async fn open_with_remote_sync_connector<C>(
297 db_path: impl Into<String>,
298 url: impl Into<String>,
299 token: impl Into<String>,
300 connector: C,
301 read_your_writes: bool,
302 encryption_config: Option<EncryptionConfig>,
303 ) -> Result<Database>
304 where
305 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
306 C::Response: crate::util::Socket,
307 C::Future: Send + 'static,
308 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
309 {
310 Self::open_with_remote_sync_connector_internal(
311 db_path,
312 url,
313 token,
314 connector,
315 None,
316 read_your_writes,
317 encryption_config,
318 None
319 ).await
320 }
321
322 #[doc(hidden)]
323 pub async fn open_with_remote_sync_internal(
324 db_path: impl Into<String>,
325 url: impl Into<String>,
326 token: impl Into<String>,
327 version: Option<String>,
328 read_your_writes: bool,
329 encryption_config: Option<EncryptionConfig>,
330 sync_interval: Option<std::time::Duration>,
331 ) -> Result<Database> {
332 let https = connector()?;
333
334 Self::open_with_remote_sync_connector_internal(
335 db_path,
336 url,
337 token,
338 https,
339 version,
340 read_your_writes,
341 encryption_config,
342 sync_interval
343 ).await
344 }
345
346 #[doc(hidden)]
347 async fn open_with_remote_sync_connector_internal<C>(
348 db_path: impl Into<String>,
349 url: impl Into<String>,
350 token: impl Into<String>,
351 connector: C,
352 version: Option<String>,
353 read_your_writes: bool,
354 encryption_config: Option<EncryptionConfig>,
355 sync_interval: Option<std::time::Duration>,
356 ) -> Result<Database>
357 where
358 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
359 C::Response: crate::util::Socket,
360 C::Future: Send + 'static,
361 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
362 {
363 use tower::ServiceExt;
364
365 let svc = connector
366 .map_err(|e| e.into())
367 .map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
368
369 let svc = crate::util::ConnectorService::new(svc);
370
371 let db = crate::local::Database::open_http_sync_internal(
372 svc,
373 db_path.into(),
374 url.into(),
375 token.into(),
376 version,
377 read_your_writes,
378 encryption_config.clone(),
379 sync_interval,
380 None,
381 None
382 ).await?;
383
384 Ok(Database {
385 db_type: DbType::Sync { db, encryption_config },
386 max_write_replication_index: Default::default(),
387 })
388 }
389
390
391 pub async fn sync(&self) -> Result<Replicated> {
394 match &self.db_type {
395 #[cfg(feature = "replication")]
396 DbType::Sync { db, encryption_config: _ } => db.sync().await,
397 #[cfg(feature = "sync")]
398 DbType::Offline { db, remote_writes: false, .. } => db.sync_offline().await,
399 #[cfg(feature = "sync")]
400 DbType::Offline { db, remote_writes: true, .. } => {
401 let mut sync_ctx = db.sync_ctx.as_ref().unwrap().lock().await;
402 crate::sync::bootstrap_db(&mut sync_ctx).await?;
403 let conn = db.connect()?;
404 crate::sync::try_pull(&mut sync_ctx, &conn).await
405 },
406 _ => Err(Error::SyncNotSupported(format!("{:?}", self.db_type))),
407 }
408 }
409
410 pub async fn sync_until(&self, replication_index: FrameNo) -> Result<Replicated> {
413 if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
414 db.sync_until(replication_index).await
415 } else {
416 Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
417 }
418 }
419
420 pub async fn sync_frames(&self, frames: crate::replication::Frames) -> Result<Option<FrameNo>> {
423 if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
424 db.sync_frames(frames).await
425 } else {
426 Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
427 }
428 }
429
430 pub async fn flush_replicator(&self) -> Result<Option<FrameNo>> {
433 if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
434 db.flush_replicator().await
435 } else {
436 Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
437 }
438 }
439
440 pub async fn replication_index(&self) -> Result<Option<FrameNo>> {
442 if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
443 db.replication_index().await
444 } else {
445 Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
446 }
447 }
448
449 pub fn freeze(self) -> Result<Database> {
457 match self.db_type {
458 DbType::Sync { db, .. } => {
459 let path = db.path().to_string();
460 Ok(Database {
461 db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false },
462 max_write_replication_index: Default::default(),
463 })
464 }
465 t => Err(Error::FreezeNotSupported(format!("{:?}", t)))
466 }
467 }
468
469 pub fn max_write_replication_index(&self) -> Option<FrameNo> {
471 let index = self
472 .max_write_replication_index
473 .load(std::sync::atomic::Ordering::SeqCst);
474 if index == 0 {
475 None
476 } else {
477 Some(index)
478 }
479 }
480 }
481}
482
483impl Database {}
484
485cfg_remote! {
486 impl Database {
487 #[deprecated = "Use the new `Builder` to construct `Database`"]
489 pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> Result<Self> {
490 let https = connector()?;
491
492 Self::open_remote_with_connector_internal(url, auth_token, https, None)
493 }
494
495 #[doc(hidden)]
496 pub fn open_remote_internal(
497 url: impl Into<String>,
498 auth_token: impl Into<String>,
499 version: impl Into<String>,
500 ) -> Result<Self> {
501 let https = connector()?;
502
503 Self::open_remote_with_connector_internal(url, auth_token, https, Some(version.into()))
504 }
505
506 #[deprecated = "Use the new `Builder` to construct `Database`"]
508 pub fn open_remote_with_connector<C>(
509 url: impl Into<String>,
510 auth_token: impl Into<String>,
511 connector: C,
512 ) -> Result<Self>
513 where
514 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
515 C::Response: crate::util::Socket,
516 C::Future: Send + 'static,
517 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
518 {
519 Self::open_remote_with_connector_internal(url, auth_token, connector, None)
520 }
521
522 #[doc(hidden)]
523 fn open_remote_with_connector_internal<C>(
524 url: impl Into<String>,
525 auth_token: impl Into<String>,
526 connector: C,
527 version: Option<String>
528 ) -> Result<Self>
529 where
530 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
531 C::Response: crate::util::Socket,
532 C::Future: Send + 'static,
533 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
534 {
535 use tower::ServiceExt;
536
537 let svc = connector
538 .map_err(|e| e.into())
539 .map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
540 Ok(Database {
541 db_type: DbType::Remote {
542 url: url.into(),
543 auth_token: auth_token.into(),
544 connector: crate::util::ConnectorService::new(svc),
545 version,
546 namespace: None,
547 },
548 max_write_replication_index: Default::default(),
549 })
550 }
551 }
552}
553
554impl Database {
555 #[allow(unreachable_patterns)]
565 pub fn connect(&self) -> Result<Connection> {
566 match &self.db_type {
567 #[cfg(feature = "core")]
568 DbType::Memory { db } => {
569 use crate::local::impls::LibsqlConnection;
570
571 let conn = db.connect()?;
572
573 let conn = std::sync::Arc::new(LibsqlConnection { conn });
574
575 Ok(Connection { conn })
576 }
577
578 #[cfg(feature = "core")]
579 DbType::File {
580 path,
581 flags,
582 encryption_config,
583 skip_saftey_assert,
584 } => {
585 use crate::local::impls::LibsqlConnection;
586
587 let db = if !skip_saftey_assert {
588 crate::local::Database::open(path, *flags)?
589 } else {
590 unsafe { crate::local::Database::open_raw(path, *flags)? }
591 };
592
593 let conn = db.connect()?;
594
595 if !cfg!(feature = "encryption") && encryption_config.is_some() {
596 return Err(crate::Error::Misuse(
597 "Encryption is not enabled: enable the `encryption` feature in order to enable encryption-at-rest".to_string(),
598 ));
599 }
600
601 #[cfg(feature = "encryption")]
602 if let Some(cfg) = encryption_config {
603 if unsafe {
604 libsql_sys::connection::set_encryption_cipher(conn.raw, cfg.cipher_id())
605 } == -1
606 {
607 return Err(crate::Error::Misuse(
608 "failed to set encryption cipher".to_string(),
609 ));
610 }
611 if unsafe {
612 libsql_sys::connection::set_encryption_key(conn.raw, &cfg.encryption_key)
613 } != crate::ffi::SQLITE_OK
614 {
615 return Err(crate::Error::Misuse(
616 "failed to set encryption key".to_string(),
617 ));
618 }
619 }
620
621 let conn = std::sync::Arc::new(LibsqlConnection { conn });
622
623 Ok(Connection { conn })
624 }
625
626 #[cfg(feature = "replication")]
627 DbType::Sync {
628 db,
629 encryption_config,
630 } => {
631 use crate::local::impls::LibsqlConnection;
632
633 let conn = db.connect()?;
634
635 if !cfg!(feature = "encryption") && encryption_config.is_some() {
636 return Err(crate::Error::Misuse(
637 "Encryption is not enabled: enable the `encryption` feature in order to enable encryption-at-rest".to_string(),
638 ));
639 }
640 #[cfg(feature = "encryption")]
641 if let Some(cfg) = encryption_config {
642 if unsafe {
643 libsql_sys::connection::set_encryption_cipher(conn.raw, cfg.cipher_id())
644 } == -1
645 {
646 return Err(crate::Error::Misuse(
647 "failed to set encryption cipher".to_string(),
648 ));
649 }
650 if unsafe {
651 libsql_sys::connection::set_encryption_key(conn.raw, &cfg.encryption_key)
652 } != crate::ffi::SQLITE_OK
653 {
654 return Err(crate::Error::Misuse(
655 "failed to set encryption key".to_string(),
656 ));
657 }
658 }
659
660 let local = LibsqlConnection { conn };
661 let writer = local.conn.new_connection_writer();
662 let remote = crate::replication::RemoteConnection::new(
663 local,
664 writer,
665 self.max_write_replication_index.clone(),
666 );
667 let conn = std::sync::Arc::new(remote);
668
669 Ok(Connection { conn })
670 }
671
672 #[cfg(feature = "sync")]
673 DbType::Offline {
674 db,
675 remote_writes,
676 read_your_writes,
677 url,
678 auth_token,
679 connector,
680 ..
681 } => {
682 use crate::{
683 hrana::connection::HttpConnection, local::impls::LibsqlConnection,
684 replication::connection::State, sync::connection::SyncedConnection,
685 };
686 use tokio::sync::Mutex;
687
688 let _ = tokio::task::block_in_place(move || {
689 let rt = tokio::runtime::Builder::new_current_thread()
690 .enable_all()
691 .build()
692 .unwrap();
693 rt.block_on(async {
694 let _ = db.bootstrap_db().await;
697 })
698 });
699
700 let local = db.connect()?;
701
702 if *remote_writes {
703 let synced = SyncedConnection {
704 local,
705 remote: HttpConnection::new_with_connector(
706 url.clone(),
707 auth_token.clone(),
708 connector.clone(),
709 None,
710 None,
711 ),
712 read_your_writes: *read_your_writes,
713 context: db.sync_ctx.clone().unwrap(),
714 state: std::sync::Arc::new(Mutex::new(State::Init)),
715 };
716
717 let conn = std::sync::Arc::new(synced);
718 return Ok(Connection { conn });
719 }
720
721 let conn = std::sync::Arc::new(LibsqlConnection { conn: local });
722 Ok(Connection { conn })
723 }
724
725 #[cfg(feature = "remote")]
726 DbType::Remote {
727 url,
728 auth_token,
729 connector,
730 version,
731 namespace,
732 } => {
733 let conn = std::sync::Arc::new(
734 crate::hrana::connection::HttpConnection::new_with_connector(
735 url,
736 auth_token,
737 connector.clone(),
738 version.as_ref().map(|s| s.as_str()),
739 namespace.as_ref().map(|s| s.as_str()),
740 ),
741 );
742
743 Ok(Connection { conn })
744 }
745
746 _ => unreachable!("no database type set"),
747 }
748 }
749}
750
751#[cfg(any(
752 all(feature = "tls", feature = "replication"),
753 all(feature = "tls", feature = "remote"),
754 all(feature = "tls", feature = "sync")
755))]
756fn connector() -> Result<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>> {
757 let mut http = hyper::client::HttpConnector::new();
758 http.enforce_http(false);
759 http.set_nodelay(true);
760
761 Ok(hyper_rustls::HttpsConnectorBuilder::new()
762 .with_native_roots()
763 .map_err(crate::Error::InvalidTlsConfiguration)?
764 .https_or_http()
765 .enable_http1()
766 .wrap_connector(http))
767}
768
769#[cfg(any(
770 all(not(feature = "tls"), feature = "replication"),
771 all(not(feature = "tls"), feature = "remote"),
772 all(not(feature = "tls"), feature = "sync")
773))]
774fn connector() -> Result<hyper::client::HttpConnector> {
775 panic!("The `tls` feature is disabled, you must provide your own http connector");
776}
777
778impl std::fmt::Debug for Database {
779 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
780 f.debug_struct("Database").finish()
781 }
782}