fts_sqlite/lib.rs
1#![warn(missing_docs)]
2// Note: this overwrites the link in the README to point to the rust docs of the fts-sqlite crate.
3//! [fts_core]: https://docs.rs/fts_core/latest/fts_core/index.html
4//! [fts_axum]: https://docs.rs/fts_axum/latest/fts_axum/index.html
5//! [fts_solver]: https://docs.rs/fts_solver/latest/fts_solver/index.html
6//! [fts_sqlite]: https://docs.rs/fts_sqlite/latest/fts_sqlite/index.html
7#![doc = include_str!("../README.md")]
8
9use sqlx::sqlite;
10use std::{str::FromStr, time::Duration};
11use tokio::try_join;
12
13pub mod config;
14mod r#impl;
15pub mod types;
16
17use config::SqliteConfig;
18
19/// SQLite database implementation for flow trading repositories.
20///
21/// This struct provides separate reader and writer connection pools to a SQLite database,
22/// implementing all the repository traits defined in `fts-core`. The separation of read
23/// and write connections allows for better concurrency control and follows SQLite best
24/// practices for Write-Ahead Logging (WAL) mode.
25///
26/// # Connection Management
27///
28/// - `reader`: A connection pool for read operations, allowing concurrent reads
29/// - `writer`: A single-connection pool for write operations, ensuring serialized writes
30///
31/// # Example
32///
33/// ```no_run
34/// # use fts_sqlite::{Db, config::SqliteConfig, types::DateTime};
35/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
36/// let config = SqliteConfig::default();
37/// let now = DateTime::from(time::OffsetDateTime::now_utc());
38/// let db = Db::open(&config, now).await?;
39/// # Ok(())
40/// # }
41/// ```
42#[derive(Clone)]
43pub struct Db {
44 /// Connection pool for read operations
45 pub reader: sqlx::Pool<sqlx::Sqlite>,
46 /// Connection pool for write operations (limited to 1 connection)
47 pub writer: sqlx::Pool<sqlx::Sqlite>,
48}
49
50impl Db {
51 /// Open a connection to the specified SQLite database.
52 ///
53 /// Creates a new database if one doesn't exist (when `create_if_missing` is true),
54 /// applies all pending migrations, and ensures the batch table is initialized.
55 ///
56 /// # Arguments
57 ///
58 /// * `config` - Configuration specifying database path and creation options
59 /// * `as_of` - Initial timestamp for the batch table if creating a new database
60 ///
61 /// # Database Configuration
62 ///
63 /// The database is configured with the following settings for optimal performance:
64 /// - WAL mode for better concurrency
65 /// - Foreign keys enabled for referential integrity
66 /// - Optimized cache and memory settings for flow trading workloads
67 ///
68 /// # Errors
69 ///
70 /// Returns `sqlx::Error` if:
71 /// - Database connection fails
72 /// - Migrations fail to apply
73 /// - Initial batch row creation fails
74 pub async fn open(config: &SqliteConfig, as_of: types::DateTime) -> Result<Self, sqlx::Error> {
75 let db_path = config
76 .database_path
77 .as_ref()
78 .map(|p| p.to_string_lossy().into_owned());
79
80 // Use the same hardcoded pragmas as the original open() method
81 let options =
82 sqlite::SqliteConnectOptions::from_str(db_path.as_deref().unwrap_or(":memory:"))?
83 .busy_timeout(Duration::from_secs(5))
84 .foreign_keys(true)
85 .journal_mode(sqlite::SqliteJournalMode::Wal)
86 .synchronous(sqlite::SqliteSynchronous::Normal)
87 .pragma("cache_size", "1000000000")
88 .pragma("journal_size_limit", "27103364")
89 .pragma("mmap_size", "134217728")
90 .pragma("temp_store", "memory")
91 .create_if_missing(config.create_if_missing);
92
93 // TODO: setting read_only(true) on the reader seems to also lock the writer, at least when using :memory:. Need to investigate.
94 let reader = sqlite::SqlitePoolOptions::new().connect_with(options.clone());
95 let writer = sqlite::SqlitePoolOptions::new()
96 .max_connections(1)
97 .connect_with(options);
98
99 let (reader, writer) = try_join!(reader, writer)?;
100
101 // Run any pending migrations before returning
102 sqlx::migrate!("./schema").run(&writer).await?;
103
104 // Also, ensure there is one row in the batch table.
105 // This is important because of the trigger-managed temporal tables.
106 // The `batch` table consists of a single row with JSON columns. On update,
107 // a trigger will "explode" the JSON values into the appropriate outcome-tracking
108 // tables, similarly to how we manage portfolios and demand curves.
109 sqlx::query!(
110 r#"
111 insert into
112 batch (id, as_of, portfolio_outcomes, product_outcomes)
113 values
114 (0, $1, jsonb('{}'), jsonb('{}'))
115 on conflict
116 do nothing
117 "#,
118 as_of
119 )
120 .execute(&writer)
121 .await?;
122
123 Ok(Self { reader, writer })
124 }
125}