1#![allow(deprecated)]
2
3mod builder;
4
5pub use builder::Builder;
6
7#[cfg(feature = "core")]
8pub use libsql_sys::{Cipher, EncryptionConfig};
9
10use crate::{Connection, Result};
11use std::fmt;
12use std::sync::atomic::AtomicU64;
13
14cfg_core! {
15 bitflags::bitflags! {
16 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
19 #[repr(C)]
20 pub struct OpenFlags: ::std::os::raw::c_int {
21 const SQLITE_OPEN_READ_ONLY = libsql_sys::ffi::SQLITE_OPEN_READONLY;
22 const SQLITE_OPEN_READ_WRITE = libsql_sys::ffi::SQLITE_OPEN_READWRITE;
23 const SQLITE_OPEN_CREATE = libsql_sys::ffi::SQLITE_OPEN_CREATE;
24 }
25 }
26
27 impl Default for OpenFlags {
28 #[inline]
29 fn default() -> OpenFlags {
30 OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_CREATE
31 }
32 }
33}
34
35cfg_replication_or_sync! {
36
37 pub type FrameNo = u64;
38
39 #[derive(Debug)]
40 #[allow(dead_code)]
42 pub struct Replicated {
43 pub(crate) frame_no: Option<FrameNo>,
44 pub(crate) frames_synced: usize,
45 }
46
47 impl Replicated {
48 #[allow(dead_code)]
54 pub fn frame_no(&self) -> Option<FrameNo> {
55 self.frame_no
56 }
57
58 #[allow(dead_code)]
62 pub fn frames_synced(&self) -> usize {
63 self.frames_synced
64 }
65 }
66
67}
68
69cfg_sync! {
70 #[derive(Default)]
71 pub enum SyncProtocol {
72 #[default]
73 Auto,
74 V1,
75 V2,
76 }
77}
78
79enum DbType {
80 #[cfg(feature = "core")]
81 Memory { db: crate::local::Database },
82 #[cfg(feature = "core")]
83 File {
84 path: String,
85 flags: OpenFlags,
86 encryption_config: Option<EncryptionConfig>,
87 skip_saftey_assert: bool,
88 },
89 #[cfg(feature = "replication")]
90 Sync {
91 db: crate::local::Database,
92 encryption_config: Option<EncryptionConfig>,
93 },
94 #[cfg(feature = "sync")]
95 Offline {
96 db: crate::local::Database,
97 remote_writes: bool,
98 read_your_writes: bool,
99 url: String,
100 auth_token: String,
101 connector: crate::util::ConnectorService,
102 },
103 #[cfg(feature = "remote")]
104 Remote {
105 url: String,
106 auth_token: String,
107 connector: crate::util::ConnectorService,
108 version: Option<String>,
109 },
110}
111
112impl fmt::Debug for DbType {
113 #[allow(unreachable_patterns)]
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 match self {
116 #[cfg(feature = "core")]
117 Self::Memory { .. } => write!(f, "Memory"),
118 #[cfg(feature = "core")]
119 Self::File { .. } => write!(f, "File"),
120 #[cfg(feature = "replication")]
121 Self::Sync { .. } => write!(f, "Sync"),
122 #[cfg(feature = "sync")]
123 Self::Offline { .. } => write!(f, "Offline"),
124 #[cfg(feature = "remote")]
125 Self::Remote { .. } => write!(f, "Remote"),
126 _ => write!(f, "no database type set"),
127 }
128 }
129}
130
131pub struct Database {
134 db_type: DbType,
135 #[allow(dead_code)]
137 max_write_replication_index: std::sync::Arc<AtomicU64>,
138}
139
140cfg_core! {
141 impl Database {
142 #[deprecated = "Use the new `Builder` to construct `Database`"]
144 pub fn open_in_memory() -> Result<Self> {
145 let db = crate::local::Database::open(":memory:", OpenFlags::default())?;
146
147 Ok(Database {
148 db_type: DbType::Memory { db },
149 max_write_replication_index: Default::default(),
150 })
151 }
152
153 #[deprecated = "Use the new `Builder` to construct `Database`"]
155 pub fn open(db_path: impl Into<String>) -> Result<Database> {
156 Database::open_with_flags(db_path, OpenFlags::default())
157 }
158
159 #[deprecated = "Use the new `Builder` to construct `Database`"]
161 pub fn open_with_flags(db_path: impl Into<String>, flags: OpenFlags) -> Result<Database> {
162 Ok(Database {
163 db_type: DbType::File {
164 path: db_path.into(),
165 flags,
166 encryption_config: None,
167 skip_saftey_assert: false,
168 },
169 max_write_replication_index: Default::default(),
170 })
171 }
172 }
173}
174
175cfg_replication! {
176 use crate::Error;
177
178
179 impl Database {
180 #[deprecated = "Use the new `Builder` to construct `Database`"]
182 pub async fn open_with_local_sync(
183 db_path: impl Into<String>,
184 encryption_config: Option<EncryptionConfig>
185 ) -> Result<Database> {
186 let db = crate::local::Database::open_local_sync(
187 db_path,
188 OpenFlags::default(),
189 encryption_config.clone()
190 ).await?;
191
192 Ok(Database {
193 db_type: DbType::Sync { db, encryption_config },
194 max_write_replication_index: Default::default(),
195 })
196 }
197
198
199 #[deprecated = "Use the new `Builder` to construct `Database`"]
202 pub async fn open_with_local_sync_remote_writes(
203 db_path: impl Into<String>,
204 endpoint: String,
205 auth_token: String,
206 encryption_config: Option<EncryptionConfig>,
207 ) -> Result<Database> {
208 let https = connector()?;
209
210 Self::open_with_local_sync_remote_writes_connector(
211 db_path,
212 endpoint,
213 auth_token,
214 https,
215 encryption_config
216 ).await
217 }
218
219 #[deprecated = "Use the new `Builder` to construct `Database`"]
222 pub async fn open_with_local_sync_remote_writes_connector<C>(
223 db_path: impl Into<String>,
224 endpoint: String,
225 auth_token: String,
226 connector: C,
227 encryption_config: Option<EncryptionConfig>,
228 ) -> Result<Database>
229 where
230 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
231 C::Response: crate::util::Socket,
232 C::Future: Send + 'static,
233 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
234 {
235 use tower::ServiceExt;
236
237 let svc = connector
238 .map_err(|e| e.into())
239 .map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
240
241 let svc = crate::util::ConnectorService::new(svc);
242
243 let db = crate::local::Database::open_local_sync_remote_writes(
244 svc,
245 db_path.into(),
246 endpoint,
247 auth_token,
248 None,
249 OpenFlags::default(),
250 encryption_config.clone(),
251 None,
252 ).await?;
253
254 Ok(Database {
255 db_type: DbType::Sync { db, encryption_config },
256 max_write_replication_index: Default::default(),
257 })
258 }
259
260 #[deprecated = "Use the new `Builder` to construct `Database`"]
262 pub async fn open_with_remote_sync(
263 db_path: impl Into<String>,
264 url: impl Into<String>,
265 token: impl Into<String>,
266 encryption_config: Option<EncryptionConfig>,
267 ) -> Result<Database> {
268 let https = connector()?;
269
270 Self::open_with_remote_sync_connector(db_path, url, token, https, false, encryption_config).await
271 }
272
273 #[deprecated = "Use the new `Builder` to construct `Database`"]
279 pub async fn open_with_remote_sync_consistent(
280 db_path: impl Into<String>,
281 url: impl Into<String>,
282 token: impl Into<String>,
283 encryption_config: Option<EncryptionConfig>,
284 ) -> Result<Database> {
285 let https = connector()?;
286
287 Self::open_with_remote_sync_connector(db_path, url, token, https, true, encryption_config).await
288 }
289
290 #[deprecated = "Use the new `Builder` to construct `Database`"]
293 pub async fn open_with_remote_sync_connector<C>(
294 db_path: impl Into<String>,
295 url: impl Into<String>,
296 token: impl Into<String>,
297 connector: C,
298 read_your_writes: bool,
299 encryption_config: Option<EncryptionConfig>,
300 ) -> Result<Database>
301 where
302 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
303 C::Response: crate::util::Socket,
304 C::Future: Send + 'static,
305 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
306 {
307 Self::open_with_remote_sync_connector_internal(
308 db_path,
309 url,
310 token,
311 connector,
312 None,
313 read_your_writes,
314 encryption_config,
315 None
316 ).await
317 }
318
319 #[doc(hidden)]
320 pub async fn open_with_remote_sync_internal(
321 db_path: impl Into<String>,
322 url: impl Into<String>,
323 token: impl Into<String>,
324 version: Option<String>,
325 read_your_writes: bool,
326 encryption_config: Option<EncryptionConfig>,
327 sync_interval: Option<std::time::Duration>,
328 ) -> Result<Database> {
329 let https = connector()?;
330
331 Self::open_with_remote_sync_connector_internal(
332 db_path,
333 url,
334 token,
335 https,
336 version,
337 read_your_writes,
338 encryption_config,
339 sync_interval
340 ).await
341 }
342
343 #[doc(hidden)]
344 async fn open_with_remote_sync_connector_internal<C>(
345 db_path: impl Into<String>,
346 url: impl Into<String>,
347 token: impl Into<String>,
348 connector: C,
349 version: Option<String>,
350 read_your_writes: bool,
351 encryption_config: Option<EncryptionConfig>,
352 sync_interval: Option<std::time::Duration>,
353 ) -> Result<Database>
354 where
355 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
356 C::Response: crate::util::Socket,
357 C::Future: Send + 'static,
358 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
359 {
360 use tower::ServiceExt;
361
362 let svc = connector
363 .map_err(|e| e.into())
364 .map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
365
366 let svc = crate::util::ConnectorService::new(svc);
367
368 let db = crate::local::Database::open_http_sync_internal(
369 svc,
370 db_path.into(),
371 url.into(),
372 token.into(),
373 version,
374 read_your_writes,
375 encryption_config.clone(),
376 sync_interval,
377 None,
378 None
379 ).await?;
380
381 Ok(Database {
382 db_type: DbType::Sync { db, encryption_config },
383 max_write_replication_index: Default::default(),
384 })
385 }
386
387
388 pub async fn sync(&self) -> Result<Replicated> {
391 match &self.db_type {
392 #[cfg(feature = "replication")]
393 DbType::Sync { db, encryption_config: _ } => db.sync().await,
394 #[cfg(feature = "sync")]
395 DbType::Offline { db, .. } => db.sync_offline().await,
396 _ => Err(Error::SyncNotSupported(format!("{:?}", self.db_type))),
397 }
398 }
399
400 pub async fn sync_until(&self, replication_index: FrameNo) -> Result<Replicated> {
403 if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
404 db.sync_until(replication_index).await
405 } else {
406 Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
407 }
408 }
409
410 pub async fn sync_frames(&self, frames: crate::replication::Frames) -> Result<Option<FrameNo>> {
413 if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
414 db.sync_frames(frames).await
415 } else {
416 Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
417 }
418 }
419
420 pub async fn flush_replicator(&self) -> Result<Option<FrameNo>> {
423 if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
424 db.flush_replicator().await
425 } else {
426 Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
427 }
428 }
429
430 pub async fn replication_index(&self) -> Result<Option<FrameNo>> {
432 if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
433 db.replication_index().await
434 } else {
435 Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
436 }
437 }
438
439 pub fn freeze(self) -> Result<Database> {
447 match self.db_type {
448 DbType::Sync { db, .. } => {
449 let path = db.path().to_string();
450 Ok(Database {
451 db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None, skip_saftey_assert: false },
452 max_write_replication_index: Default::default(),
453 })
454 }
455 t => Err(Error::FreezeNotSupported(format!("{:?}", t)))
456 }
457 }
458
459 pub fn max_write_replication_index(&self) -> Option<FrameNo> {
461 let index = self
462 .max_write_replication_index
463 .load(std::sync::atomic::Ordering::SeqCst);
464 if index == 0 {
465 None
466 } else {
467 Some(index)
468 }
469 }
470 }
471}
472
473impl Database {}
474
475cfg_remote! {
476 impl Database {
477 #[deprecated = "Use the new `Builder` to construct `Database`"]
479 pub fn open_remote(url: impl Into<String>, auth_token: impl Into<String>) -> Result<Self> {
480 let https = connector()?;
481
482 Self::open_remote_with_connector_internal(url, auth_token, https, None)
483 }
484
485 #[doc(hidden)]
486 pub fn open_remote_internal(
487 url: impl Into<String>,
488 auth_token: impl Into<String>,
489 version: impl Into<String>,
490 ) -> Result<Self> {
491 let https = connector()?;
492
493 Self::open_remote_with_connector_internal(url, auth_token, https, Some(version.into()))
494 }
495
496 #[deprecated = "Use the new `Builder` to construct `Database`"]
498 pub fn open_remote_with_connector<C>(
499 url: impl Into<String>,
500 auth_token: impl Into<String>,
501 connector: C,
502 ) -> Result<Self>
503 where
504 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
505 C::Response: crate::util::Socket,
506 C::Future: Send + 'static,
507 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
508 {
509 Self::open_remote_with_connector_internal(url, auth_token, connector, None)
510 }
511
512 #[doc(hidden)]
513 fn open_remote_with_connector_internal<C>(
514 url: impl Into<String>,
515 auth_token: impl Into<String>,
516 connector: C,
517 version: Option<String>
518 ) -> Result<Self>
519 where
520 C: tower::Service<http::Uri> + Send + Clone + Sync + 'static,
521 C::Response: crate::util::Socket,
522 C::Future: Send + 'static,
523 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
524 {
525 use tower::ServiceExt;
526
527 let svc = connector
528 .map_err(|e| e.into())
529 .map_response(|s| Box::new(s) as Box<dyn crate::util::Socket>);
530 Ok(Database {
531 db_type: DbType::Remote {
532 url: url.into(),
533 auth_token: auth_token.into(),
534 connector: crate::util::ConnectorService::new(svc),
535 version,
536 },
537 max_write_replication_index: Default::default(),
538 })
539 }
540 }
541}
542
543impl Database {
544 #[allow(unreachable_patterns)]
554 pub fn connect(&self) -> Result<Connection> {
555 match &self.db_type {
556 #[cfg(feature = "core")]
557 DbType::Memory { db } => {
558 use crate::local::impls::LibsqlConnection;
559
560 let conn = db.connect()?;
561
562 let conn = std::sync::Arc::new(LibsqlConnection { conn });
563
564 Ok(Connection { conn })
565 }
566
567 #[cfg(feature = "core")]
568 DbType::File {
569 path,
570 flags,
571 encryption_config,
572 skip_saftey_assert,
573 } => {
574 use crate::local::impls::LibsqlConnection;
575
576 let db = if !skip_saftey_assert {
577 crate::local::Database::open(path, *flags)?
578 } else {
579 unsafe { crate::local::Database::open_raw(path, *flags)? }
580 };
581
582 let conn = db.connect()?;
583
584 if !cfg!(feature = "encryption") && encryption_config.is_some() {
585 return Err(crate::Error::Misuse(
586 "Encryption is not enabled: enable the `encryption` feature in order to enable encryption-at-rest".to_string(),
587 ));
588 }
589
590 #[cfg(feature = "encryption")]
591 if let Some(cfg) = encryption_config {
592 if unsafe {
593 libsql_sys::connection::set_encryption_cipher(conn.raw, cfg.cipher_id())
594 } == -1
595 {
596 return Err(crate::Error::Misuse(
597 "failed to set encryption cipher".to_string(),
598 ));
599 }
600 if unsafe {
601 libsql_sys::connection::set_encryption_key(conn.raw, &cfg.encryption_key)
602 } != crate::ffi::SQLITE_OK
603 {
604 return Err(crate::Error::Misuse(
605 "failed to set encryption key".to_string(),
606 ));
607 }
608 }
609
610 let conn = std::sync::Arc::new(LibsqlConnection { conn });
611
612 Ok(Connection { conn })
613 }
614
615 #[cfg(feature = "replication")]
616 DbType::Sync {
617 db,
618 encryption_config,
619 } => {
620 use crate::local::impls::LibsqlConnection;
621
622 let conn = db.connect()?;
623
624 if !cfg!(feature = "encryption") && encryption_config.is_some() {
625 return Err(crate::Error::Misuse(
626 "Encryption is not enabled: enable the `encryption` feature in order to enable encryption-at-rest".to_string(),
627 ));
628 }
629 #[cfg(feature = "encryption")]
630 if let Some(cfg) = encryption_config {
631 if unsafe {
632 libsql_sys::connection::set_encryption_cipher(conn.raw, cfg.cipher_id())
633 } == -1
634 {
635 return Err(crate::Error::Misuse(
636 "failed to set encryption cipher".to_string(),
637 ));
638 }
639 if unsafe {
640 libsql_sys::connection::set_encryption_key(conn.raw, &cfg.encryption_key)
641 } != crate::ffi::SQLITE_OK
642 {
643 return Err(crate::Error::Misuse(
644 "failed to set encryption key".to_string(),
645 ));
646 }
647 }
648
649 let local = LibsqlConnection { conn };
650 let writer = local.conn.new_connection_writer();
651 let remote = crate::replication::RemoteConnection::new(
652 local,
653 writer,
654 self.max_write_replication_index.clone(),
655 );
656 let conn = std::sync::Arc::new(remote);
657
658 Ok(Connection { conn })
659 }
660
661 #[cfg(feature = "sync")]
662 DbType::Offline {
663 db,
664 remote_writes,
665 read_your_writes,
666 url,
667 auth_token,
668 connector,
669 } => {
670 use crate::{
671 hrana::{connection::HttpConnection, hyper::HttpSender},
672 local::impls::LibsqlConnection,
673 replication::connection::State,
674 sync::connection::SyncedConnection,
675 };
676 use tokio::sync::Mutex;
677
678 let local = db.connect()?;
679
680 if *remote_writes {
681 let synced = SyncedConnection {
682 local,
683 remote: HttpConnection::new(
684 url.clone(),
685 auth_token.clone(),
686 HttpSender::new(connector.clone(), None),
687 ),
688 read_your_writes: *read_your_writes,
689 context: db.sync_ctx.clone().unwrap(),
690 state: std::sync::Arc::new(Mutex::new(State::Init)),
691 };
692
693 let conn = std::sync::Arc::new(synced);
694 return Ok(Connection { conn });
695 }
696
697 let conn = std::sync::Arc::new(LibsqlConnection { conn: local });
698 Ok(Connection { conn })
699 }
700
701 #[cfg(feature = "remote")]
702 DbType::Remote {
703 url,
704 auth_token,
705 connector,
706 version,
707 } => {
708 let conn = std::sync::Arc::new(
709 crate::hrana::connection::HttpConnection::new_with_connector(
710 url,
711 auth_token,
712 connector.clone(),
713 version.as_ref().map(|s| s.as_str()),
714 ),
715 );
716
717 Ok(Connection { conn })
718 }
719
720 _ => unreachable!("no database type set"),
721 }
722 }
723}
724
725#[cfg(any(
726 all(feature = "tls", feature = "replication"),
727 all(feature = "tls", feature = "remote"),
728 all(feature = "tls", feature = "sync")
729))]
730fn connector() -> Result<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>> {
731 let mut http = hyper::client::HttpConnector::new();
732 http.enforce_http(false);
733 http.set_nodelay(true);
734
735 Ok(hyper_rustls::HttpsConnectorBuilder::new()
736 .with_native_roots()
737 .map_err(crate::Error::InvalidTlsConfiguration)?
738 .https_or_http()
739 .enable_http1()
740 .wrap_connector(http))
741}
742
743#[cfg(any(
744 all(not(feature = "tls"), feature = "replication"),
745 all(not(feature = "tls"), feature = "remote"),
746 all(not(feature = "tls"), feature = "sync")
747))]
748fn connector() -> Result<hyper::client::HttpConnector> {
749 panic!("The `tls` feature is disabled, you must provide your own http connector");
750}
751
752impl std::fmt::Debug for Database {
753 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
754 f.debug_struct("Database").finish()
755 }
756}