fts_sqlite/
lib.rs

1#![warn(missing_docs)]
2// Note: this overwrites the link in the README to point to the rust docs of the fts-sqlite crate.
3//! [fts_core]: https://docs.rs/fts_core/latest/fts_core/index.html
4//! [fts_axum]: https://docs.rs/fts_axum/latest/fts_axum/index.html
5//! [fts_solver]: https://docs.rs/fts_solver/latest/fts_solver/index.html
6//! [fts_sqlite]: https://docs.rs/fts_sqlite/latest/fts_sqlite/index.html
7#![doc = include_str!("../README.md")]
8
9use sqlx::sqlite;
10use std::{str::FromStr, time::Duration};
11use tokio::try_join;
12
13pub mod config;
14mod r#impl;
15pub mod types;
16
17use config::SqliteConfig;
18
19/// SQLite database implementation for flow trading repositories.
20///
21/// This struct provides separate reader and writer connection pools to a SQLite database,
22/// implementing all the repository traits defined in `fts-core`. The separation of read
23/// and write connections allows for better concurrency control and follows SQLite best
24/// practices for Write-Ahead Logging (WAL) mode.
25///
26/// # Connection Management
27///
28/// - `reader`: A connection pool for read operations, allowing concurrent reads
29/// - `writer`: A single-connection pool for write operations, ensuring serialized writes
30///
31/// # Example
32///
33/// ```no_run
34/// # use fts_sqlite::{Db, config::SqliteConfig, types::DateTime};
35/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
36/// let config = SqliteConfig::default();
37/// let now = DateTime::from(time::OffsetDateTime::now_utc());
38/// let db = Db::open(&config, now).await?;
39/// # Ok(())
40/// # }
41/// ```
42#[derive(Clone)]
43pub struct Db {
44    /// Connection pool for read operations
45    pub reader: sqlx::Pool<sqlx::Sqlite>,
46    /// Connection pool for write operations (limited to 1 connection)
47    pub writer: sqlx::Pool<sqlx::Sqlite>,
48}
49
50impl Db {
51    /// Open a connection to the specified SQLite database.
52    ///
53    /// Creates a new database if one doesn't exist (when `create_if_missing` is true),
54    /// applies all pending migrations, and ensures the batch table is initialized.
55    ///
56    /// # Arguments
57    ///
58    /// * `config` - Configuration specifying database path and creation options
59    /// * `as_of` - Initial timestamp for the batch table if creating a new database
60    ///
61    /// # Database Configuration
62    ///
63    /// The database is configured with the following settings for optimal performance:
64    /// - WAL mode for better concurrency
65    /// - Foreign keys enabled for referential integrity
66    /// - Optimized cache and memory settings for flow trading workloads
67    ///
68    /// # Errors
69    ///
70    /// Returns `sqlx::Error` if:
71    /// - Database connection fails
72    /// - Migrations fail to apply
73    /// - Initial batch row creation fails
74    pub async fn open(config: &SqliteConfig, as_of: types::DateTime) -> Result<Self, sqlx::Error> {
75        let db_path = config
76            .database_path
77            .as_ref()
78            .map(|p| p.to_string_lossy().into_owned());
79
80        // Use the same hardcoded pragmas as the original open() method
81        let options =
82            sqlite::SqliteConnectOptions::from_str(db_path.as_deref().unwrap_or(":memory:"))?
83                .busy_timeout(Duration::from_secs(5))
84                .foreign_keys(true)
85                .journal_mode(sqlite::SqliteJournalMode::Wal)
86                .synchronous(sqlite::SqliteSynchronous::Normal)
87                .pragma("cache_size", "1000000000")
88                .pragma("journal_size_limit", "27103364")
89                .pragma("mmap_size", "134217728")
90                .pragma("temp_store", "memory")
91                .create_if_missing(config.create_if_missing);
92
93        // TODO: setting read_only(true) on the reader seems to also lock the writer, at least when using :memory:. Need to investigate.
94        let reader = sqlite::SqlitePoolOptions::new().connect_with(options.clone());
95        let writer = sqlite::SqlitePoolOptions::new()
96            .max_connections(1)
97            .connect_with(options);
98
99        let (reader, writer) = try_join!(reader, writer)?;
100
101        // Run any pending migrations before returning
102        sqlx::migrate!("./schema").run(&writer).await?;
103
104        // Also, ensure there is one row in the batch table.
105        // This is important because of the trigger-managed temporal tables.
106        // The `batch` table consists of a single row with JSON columns. On update,
107        // a trigger will "explode" the JSON values into the appropriate outcome-tracking
108        // tables, similarly to how we manage portfolios and demand curves.
109        sqlx::query!(
110            r#"
111            insert into
112                batch (id, as_of, portfolio_outcomes, product_outcomes)
113            values
114                (0, $1, jsonb('{}'), jsonb('{}'))
115            on conflict
116                do nothing
117            "#,
118            as_of
119        )
120        .execute(&writer)
121        .await?;
122
123        Ok(Self { reader, writer })
124    }
125}