1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
#[cfg(all(not(feature = "test"), any(test, doctest)))]
compile_error!("turbosql must be tested with '--features test'");

#[cfg(all(feature = "test", doctest))]
doc_comment::doctest!("../../README.md");

use itertools::{
 EitherOrBoth::{Both, Left, Right},
 Itertools,
};
use rusqlite::{Connection, OpenFlags};
use serde::Deserialize;
use std::cell::RefCell;
use std::path::{Path, PathBuf};
use std::sync::Mutex;

// these re-exports are used in macro expansions

#[doc(hidden)]
pub use once_cell::sync::Lazy;
#[doc(hidden)]
pub use rusqlite::{
 params, types::FromSql, types::FromSqlResult, types::ToSql, types::ToSqlOutput, types::Value,
 types::ValueRef, Error, OptionalExtension, Result,
};
#[doc(hidden)]
pub use serde::Serialize;
pub use turbosql_impl::{execute, select, Turbosql};

/// Wrapper for `Vec<u8>` that may one day impl `Read`, `Write` and `Seek` traits.
pub type Blob = Vec<u8>;

#[derive(Clone, Debug, Deserialize, Default)]
struct MigrationsToml {
 migrations_append_only: Option<Vec<String>>,
 output_generated_schema_for_your_information_do_not_edit: Option<String>,
}

#[derive(Clone, Debug, Default)]
struct DbPath {
 path: PathBuf,
 opened: bool,
}

static __DB_PATH: Lazy<Mutex<DbPath>> = Lazy::new(|| {
 #[cfg(not(feature = "test"))]
 let path = {
  let exe_stem = std::env::current_exe().unwrap().file_stem().unwrap().to_owned();
  let exe_stem_lossy = exe_stem.to_string_lossy();

  let path = directories_next::ProjectDirs::from("org", &exe_stem_lossy, &exe_stem_lossy)
   .unwrap()
   .data_dir()
   .to_owned();

  std::fs::create_dir_all(&path).unwrap();

  path.join(exe_stem).with_extension("sqlite")
 };

 #[cfg(feature = "test")]
 let path = Path::new(":memory:").to_owned();

 Mutex::new(DbPath { path, ..Default::default() })
});

fn run_migrations(conn: &mut Connection) {
 cfg_if::cfg_if! {
  if #[cfg(doc)] {
   // if these are what's run in doctests, could add a test struct here to scaffold one-liner tests
   let toml_decoded: MigrationsToml = MigrationsToml::default();
  } else if #[cfg(feature = "test")] {
   let toml_decoded: MigrationsToml = toml::from_str(include_str!("../../test.migrations.toml")).unwrap();
  } else {
   let toml_decoded: MigrationsToml = toml::from_str(include_str!(concat!(env!("OUT_DIR"), "/../../../../../migrations.toml"))).expect("Unable to decode embedded migrations.toml");
  }
 };

 let target_migrations = toml_decoded.migrations_append_only.unwrap_or_else(Vec::new);

 // filter out comments
 let target_migrations: Vec<_> =
  target_migrations.into_iter().filter(|m| !m.starts_with("--")).collect();

 conn.execute("BEGIN EXCLUSIVE TRANSACTION", params![]).unwrap();

 let _ = conn.execute("ALTER TABLE turbosql_migrations RENAME TO _turbosql_migrations", params![]);

 let result = conn.query_row(
  "SELECT sql FROM sqlite_master WHERE name = ?",
  params!["_turbosql_migrations"],
  |row| {
   let sql: String = row.get(0).unwrap();
   Ok(sql)
  },
 );

 match result {
  Err(rusqlite::Error::QueryReturnedNoRows) => {
   // no migrations table exists yet, create
   conn
    .execute_batch(
     r#"CREATE TABLE _turbosql_migrations (rowid INTEGER PRIMARY KEY, migration TEXT NOT NULL)"#,
    )
    .expect("CREATE TABLE _turbosql_migrations");
  }
  Err(err) => {
   panic!("Could not query sqlite_master table: {}", err);
  }
  Ok(_) => (),
 }

 let applied_migrations = conn
  .prepare("SELECT migration FROM _turbosql_migrations ORDER BY rowid")
  .unwrap()
  .query_map(params![], |row| Ok(row.get(0).unwrap()))
  .unwrap()
  .map(|x: Result<String, _>| x.unwrap())
  .filter(|m| !m.starts_with("--"))
  .collect::<Vec<String>>();

 // execute migrations

 applied_migrations.iter().zip_longest(&target_migrations).for_each(|item| match item {
  Both(a, b) => {
   if a != b {
    panic!("Mismatch in Turbosql migrations! {:?} != {:?}", a, b)
   }
  }
  Left(_) => panic!("More migrations are applied than target"),
  Right(migration) => {
   // eprintln!("insert -> {:#?}", migration);
   if !migration.starts_with("--") {
    conn.execute(migration, params![]).unwrap();
   }
   conn
    .execute("INSERT INTO _turbosql_migrations(migration) VALUES(?)", params![migration])
    .unwrap();
  }
 });

 // TODO: verify schema against output_generated_schema_for_your_information_do_not_edit

 //    if sql != create_sql {
 //     println!("{}", sql);
 //     println!("{}", create_sql);
 //     panic!("Turbosql sqlite schema does not match! Delete database file to continue.");
 //    }

 conn.execute("COMMIT", params![]).unwrap();
}

#[derive(Debug)]
pub struct CheckpointResult {
 pub busy: i64,
 pub log: i64,
 pub checkpointed: i64,
}

/// Checkpoint the DB.
/// If no other threads have open connections, this will clean up the `-wal` and `-shm` files as well.
pub fn checkpoint() -> anyhow::Result<CheckpointResult> {
 let start = std::time::Instant::now();
 let db_path = __DB_PATH.lock().unwrap();

 let conn = Connection::open_with_flags(
  &db_path.path,
  OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
 )?;

 let result = conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", params![], |row| {
  Ok(CheckpointResult { busy: row.get(0)?, log: row.get(1)?, checkpointed: row.get(2)? })
 })?;

 log::info!("db checkpointed in {:?} {:#?}", start.elapsed(), result);

 Ok(result)
}

fn open_db() -> Connection {
 let start = std::time::Instant::now();

 let mut db_path = __DB_PATH.lock().unwrap();

 // We are handling the mutex by being thread_local, so SQLite can be opened in no-mutex mode; see:
 // http://sqlite.1065341.n5.nabble.com/SQLITE-OPEN-FULLMUTEX-vs-SQLITE-OPEN-NOMUTEX-td104785.html

 let mut conn = Connection::open_with_flags(
  &db_path.path,
  OpenFlags::SQLITE_OPEN_READ_WRITE
   | OpenFlags::SQLITE_OPEN_CREATE
   | OpenFlags::SQLITE_OPEN_NO_MUTEX,
 )
 .expect("rusqlite::Connection::open_with_flags");

 conn
  .execute_batch(
   r#"
    PRAGMA busy_timeout=3000;
    PRAGMA auto_vacuum=INCREMENTAL;
    PRAGMA journal_mode=WAL;
    PRAGMA wal_autocheckpoint=8000;
    PRAGMA synchronous=NORMAL;
   "#,
  )
  .expect("Execute PRAGMAs");

 if !db_path.opened {
  run_migrations(&mut conn);
  db_path.opened = true;
 }

 log::info!("db opened in {:?}", start.elapsed());

 conn
}

#[doc(hidden)]
thread_local! {
 pub static __TURBOSQL_DB: RefCell<Connection> = RefCell::new(open_db());
}

/// Set the local path and filename where Turbosql will store the underlying SQLite database.
///
/// Must be called before any usage of Turbosql macros or will return an error.
pub fn set_db_path(path: &Path) -> Result<(), anyhow::Error> {
 let mut db_path = __DB_PATH.lock().unwrap();

 if db_path.opened {
  return Err(anyhow::anyhow!("Trying to set path when DB is already opened"));
 }

 db_path.path = path.to_owned();

 Ok(())
}

#[cfg(test)]
mod tests {
 use super::*;
 #[test]
 fn test_checkpoint() {
  checkpoint().unwrap();
 }
}