1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
use itertools::{
 EitherOrBoth::{Both, Left, Right},
 Itertools,
};
use rusqlite::{Connection, OpenFlags};
use serde::Deserialize;
use std::cell::RefCell;
use std::path::{Path, PathBuf};
use std::sync::Mutex;

// these re-exports are used in macro expansions

#[doc(hidden)]
pub use once_cell::sync::Lazy;
#[doc(hidden)]
pub use rusqlite::{
 named_params, params, types::FromSql, types::FromSqlResult, types::ToSql, types::ToSqlOutput,
 types::Value, types::ValueRef, Error, OptionalExtension, Result,
};
#[doc(hidden)]
pub use serde::Serialize;
pub use turbosql_impl::{execute, select, Turbosql};

/// Wrapper for `Vec<u8>` that may one day impl `Read`, `Write` and `Seek` traits.
pub type Blob = Vec<u8>;

/// `#[derive(Turbosql)]` generates impls for this trait.
pub trait Turbosql {
 /// Inserts this row into the database. `rowid` must be `None`. On success, returns the new `rowid`.
 fn insert(&self) -> Result<i64>;
 fn insert_batch<T: AsRef<Self>>(rows: &[T]);
 /// Updates this existing row in the database, based on `rowid`, which must be `Some`. All fields are overwritten in the database. On success, returns the number of rows updated, which should be 1.
 fn update(&self) -> Result<usize>;
 fn update_batch<T: AsRef<Self>>(rows: &[T]);
}

#[derive(thiserror::Error, Debug)]
pub enum TurbosqlError {
 #[error("Turbosql Error: {0}")]
 OtherError(&'static str),
}

#[allow(dead_code)]
#[derive(Clone, Debug, Deserialize, Default)]
struct MigrationsToml {
 migrations_append_only: Option<Vec<String>>,
 output_generated_schema_for_your_information_do_not_edit: Option<String>,
}

#[derive(Clone, Debug, Default)]
struct DbPath {
 path: Option<PathBuf>,
 opened: bool,
}

static __DB_PATH: Lazy<Mutex<DbPath>> = Lazy::new(|| Mutex::new(DbPath::default()));

/// Convenience function that returns the current time as milliseconds since UNIX epoch.
pub fn now_ms() -> i64 {
 std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_millis() as i64
}

fn run_migrations(conn: &mut Connection) {
 cfg_if::cfg_if! {
  if #[cfg(doc)] {
   // if these are what's run in doctests, could add a test struct here to scaffold one-liner tests
   let toml_decoded: MigrationsToml = MigrationsToml::default();
  } else if #[cfg(feature = "test")] {
   let toml_decoded: MigrationsToml = toml::from_str(include_str!("../../test.migrations.toml")).unwrap();
  } else {
   let toml_decoded: MigrationsToml = toml::from_str(include_str!(concat!(env!("OUT_DIR"), "/migrations.toml"))).expect("Unable to decode embedded migrations.toml");
  }
 };

 let target_migrations = toml_decoded.migrations_append_only.unwrap_or_default();

 // filter out comments
 let target_migrations: Vec<_> =
  target_migrations.into_iter().filter(|m| !m.starts_with("--")).collect();

 conn.execute("BEGIN EXCLUSIVE TRANSACTION", params![]).unwrap();

 let _ = conn.execute("ALTER TABLE turbosql_migrations RENAME TO _turbosql_migrations", params![]);

 let result = conn.query_row(
  "SELECT sql FROM sqlite_master WHERE name = ?",
  params!["_turbosql_migrations"],
  |row| {
   let sql: String = row.get(0).unwrap();
   Ok(sql)
  },
 );

 match result {
  Err(rusqlite::Error::QueryReturnedNoRows) => {
   // no migrations table exists yet, create
   conn
    .execute_batch(
     r#"CREATE TABLE _turbosql_migrations (rowid INTEGER PRIMARY KEY, migration TEXT NOT NULL) STRICT"#,
    )
    .expect("CREATE TABLE _turbosql_migrations");
  }
  Err(err) => {
   panic!("Could not query sqlite_master table: {}", err);
  }
  Ok(_) => (),
 }

 let applied_migrations = conn
  .prepare("SELECT migration FROM _turbosql_migrations ORDER BY rowid")
  .unwrap()
  .query_map(params![], |row| Ok(row.get(0).unwrap()))
  .unwrap()
  .map(|x: Result<String, _>| x.unwrap())
  .filter(|m| !m.starts_with("--"))
  .collect::<Vec<String>>();

 // execute migrations

 applied_migrations.iter().zip_longest(&target_migrations).for_each(|item| match item {
  Both(a, b) => {
   if a != b {
    panic!("Mismatch in Turbosql migrations! {:?} != {:?}", a, b)
   }
  }
  Left(_) => panic!("More migrations are applied than target"),
  Right(migration) => {
   // eprintln!("insert -> {:#?}", migration);
   if !migration.starts_with("--") {
    conn.execute(migration, params![]).unwrap();
   }
   conn
    .execute("INSERT INTO _turbosql_migrations(migration) VALUES(?)", params![migration])
    .unwrap();
  }
 });

 // TODO: verify schema against output_generated_schema_for_your_information_do_not_edit

 //    if sql != create_sql {
 //     println!("{}", sql);
 //     println!("{}", create_sql);
 //     panic!("Turbosql sqlite schema does not match! Delete database file to continue.");
 //    }

 conn.execute("COMMIT", params![]).unwrap();
}

#[derive(Debug)]
pub struct CheckpointResult {
 /// Should always be 0. (Checkpoint is run in PASSIVE mode.)
 pub busy: i64,
 /// The number of modified pages that have been written to the write-ahead log file.
 pub log: i64,
 /// The number of pages in the write-ahead log file that have been successfully moved back into the database file at the conclusion of the checkpoint.
 pub checkpointed: i64,
}

/// Checkpoint the DB.
/// If no other threads have open connections, this will clean up the `-wal` and `-shm` files as well.
pub fn checkpoint() -> Result<CheckpointResult> {
 let start = std::time::Instant::now();
 let db_path = __DB_PATH.lock().unwrap();

 let conn = Connection::open_with_flags(
  db_path.path.as_ref().unwrap(),
  OpenFlags::SQLITE_OPEN_READ_WRITE | OpenFlags::SQLITE_OPEN_NO_MUTEX,
 )?;

 let result = conn.query_row("PRAGMA wal_checkpoint(PASSIVE)", params![], |row| {
  Ok(CheckpointResult { busy: row.get(0)?, log: row.get(1)?, checkpointed: row.get(2)? })
 })?;

 log::info!("db checkpointed in {:?} {:#?}", start.elapsed(), result);

 Ok(result)
}

fn open_db() -> Connection {
 let mut db_path = __DB_PATH.lock().unwrap();

 if db_path.path == None {
  #[cfg(not(feature = "test"))]
  let path = {
   let exe_stem = std::env::current_exe().unwrap().file_stem().unwrap().to_owned();
   let exe_stem_lossy = exe_stem.to_string_lossy();

   let path = directories_next::ProjectDirs::from("org", &exe_stem_lossy, &exe_stem_lossy)
    .unwrap()
    .data_dir()
    .to_owned();

   std::fs::create_dir_all(&path).unwrap();

   path.join(exe_stem).with_extension("sqlite")
  };

  #[cfg(feature = "test")]
  let path = Path::new(":memory:").to_owned();

  db_path.path = Some(path);
 }

 log::debug!("opening db at {:?}", db_path.path.as_ref().unwrap());

 // We are handling the mutex by being thread_local, so SQLite can be opened in no-mutex mode; see:
 // https://www.mail-archive.com/sqlite-users@mailinglists.sqlite.org/msg112907.html

 let mut conn = Connection::open_with_flags(
  db_path.path.as_ref().unwrap(),
  OpenFlags::SQLITE_OPEN_READ_WRITE
   | OpenFlags::SQLITE_OPEN_CREATE
   | OpenFlags::SQLITE_OPEN_NO_MUTEX,
 )
 .expect("rusqlite::Connection::open_with_flags");

 conn
  .execute_batch(
   r#"
    PRAGMA busy_timeout=3000;
    PRAGMA auto_vacuum=INCREMENTAL;
    PRAGMA journal_mode=WAL;
    PRAGMA wal_autocheckpoint=8000;
    PRAGMA synchronous=NORMAL;
   "#,
  )
  .expect("Execute PRAGMAs");

 if !db_path.opened {
  run_migrations(&mut conn);
  db_path.opened = true;
 }

 conn
}

thread_local! {
 #[doc(hidden)]
 pub static __TURBOSQL_DB: RefCell<Connection> = RefCell::new(open_db());
}

/// Set the local path and filename where Turbosql will store the underlying SQLite database.
///
/// Must be called before any usage of Turbosql macros or will return an error.
pub fn set_db_path(path: &Path) -> Result<(), TurbosqlError> {
 let mut db_path = __DB_PATH.lock().unwrap();

 if db_path.opened {
  return Err(TurbosqlError::OtherError("Trying to set path when DB is already opened"));
 }

 db_path.path = Some(path.to_owned());

 Ok(())
}