reinhardt-db 0.1.2

Django-style database layer for Reinhardt framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
//! Schema Editor for migration execution
//!
//! Provides atomic transaction support for DDL operations,
//! similar to Django's schema_editor.
//!
//! # Overview
//!
//! The `SchemaEditor` wraps database connections and optionally manages
//! transactions for atomic migration execution. It follows Django's pattern
//! where migrations can be wrapped in transactions for databases that support
//! transactional DDL.
//!
//! # Database Support
//!
//! | Database | Transactional DDL | Notes |
//! |----------|-------------------|-------|
//! | PostgreSQL | Yes | DDL can be rolled back |
//! | SQLite | Yes | DDL can be rolled back |
//! | MySQL | No | DDL causes implicit commit |
//!
//! # Example
//!
//! ```ignore
//! use reinhardt_db::migrations::schema_editor::SchemaEditor;
//! use reinhardt_db::backends::{DatabaseConnection, DatabaseType};
//!
//! let connection = DatabaseConnection::connect_postgres("postgres://...").await?;
//! let mut editor = SchemaEditor::new(connection.clone(), true, DatabaseType::Postgres).await?;
//!
//! editor.execute("CREATE TABLE users (id SERIAL PRIMARY KEY)").await?;
//! editor.execute("ALTER TABLE users ADD COLUMN name TEXT").await?;
//!
//! // Commit all changes atomically
//! editor.finish().await?;
//! ```

use super::Result;
use crate::backends::{
	connection::DatabaseConnection,
	types::{DatabaseType, QueryValue, Row, TransactionExecutor},
};

/// Schema editor for executing DDL statements with optional transaction support
///
/// This struct wraps a database connection and optionally manages a transaction
/// for atomic migration execution. It follows Django's schema_editor pattern.
///
/// When `atomic` is `true` and the database supports transactional DDL,
/// all DDL operations are wrapped in a transaction that can be committed
/// or rolled back as a unit.
pub struct SchemaEditor {
	/// Database connection
	connection: DatabaseConnection,
	/// Transaction executor (if using atomic mode)
	executor: Option<Box<dyn TransactionExecutor>>,
	/// Whether this editor is using atomic transactions
	atomic: bool,
	/// Database type
	db_type: DatabaseType,
	/// Deferred SQL statements to execute at finish
	deferred_sql: Vec<String>,
}

impl SchemaEditor {
	/// Create a new schema editor
	///
	/// If `atomic` is true and the database supports transactional DDL,
	/// a transaction will be started automatically.
	///
	/// # Arguments
	///
	/// * `connection` - Database connection to use
	/// * `atomic` - Whether to wrap operations in a transaction
	/// * `db_type` - Database type for dialect-specific handling
	///
	/// # Returns
	///
	/// A new SchemaEditor instance
	///
	/// # Notes
	///
	/// If `atomic` is `true` but the database doesn't support transactional DDL
	/// (e.g., MySQL), a warning is logged and operations proceed without
	/// transaction wrapping.
	pub async fn new(
		connection: DatabaseConnection,
		atomic: bool,
		db_type: DatabaseType,
	) -> Result<Self> {
		let effective_atomic = atomic && db_type.supports_transactional_ddl();

		let executor = if effective_atomic {
			Some(connection.begin().await?)
		} else {
			if atomic && !db_type.supports_transactional_ddl() {
				tracing::warn!(
					"atomic=true requested but {:?} doesn't support transactional DDL. \
					 Proceeding without transaction wrapper.",
					db_type
				);
			}
			None
		};

		Ok(Self {
			connection,
			executor,
			atomic: effective_atomic,
			db_type,
			deferred_sql: Vec::new(),
		})
	}

	/// Execute a DDL statement
	///
	/// If in atomic mode, executes within the transaction.
	/// Otherwise, executes directly on the connection.
	///
	/// # Arguments
	///
	/// * `sql` - SQL statement to execute
	pub async fn execute(&mut self, sql: &str) -> Result<()> {
		if let Some(ref mut tx) = self.executor {
			tx.execute(sql, vec![]).await?;
			// SQLite requires a schema cache refresh after DDL within a transaction
			// to prevent SQLITE_SCHEMA (code 262) errors on subsequent DDL statements.
			if self.db_type == DatabaseType::Sqlite {
				tx.execute("SELECT 1", vec![]).await?;
			}
		} else {
			self.connection.execute(sql, vec![]).await?;
		}

		Ok(())
	}

	/// Fetch all rows for a read query, routed through the same connection as
	/// in-flight DDL so that in-transaction schema changes are visible.
	///
	/// When the editor is in atomic mode, the query is dispatched on the open
	/// transaction. Without this, a read issued through the pool would
	/// transparently land on a *different* physical connection and would not
	/// observe uncommitted DDL — the failure mode behind reinhardt-web#4447.
	pub async fn fetch_all(&mut self, sql: &str, params: Vec<QueryValue>) -> Result<Vec<Row>> {
		if let Some(ref mut tx) = self.executor {
			Ok(tx.fetch_all(sql, params).await?)
		} else {
			Ok(self.connection.fetch_all(sql, params).await?)
		}
	}

	/// Fetch a single optional row through the in-flight transaction (if any).
	///
	/// Mirrors [`Self::fetch_all`] for callers that expect zero or one row.
	pub async fn fetch_optional(
		&mut self,
		sql: &str,
		params: Vec<QueryValue>,
	) -> Result<Option<Row>> {
		if let Some(ref mut tx) = self.executor {
			Ok(tx.fetch_optional(sql, params).await?)
		} else {
			Ok(self.connection.fetch_optional(sql, params).await?)
		}
	}

	/// Check whether a table exists, routed through the editor's open
	/// transaction so that schema changes made earlier in the same atomic
	/// migration are visible to the check.
	///
	/// When the editor is non-atomic this falls back to the pool, identical
	/// in semantics to [`DatabaseConnection::fetch_optional`].
	///
	/// # SQLite-specific rationale (reinhardt-web#4584)
	///
	/// On SQLite, when an atomic migration has already executed a DDL on the
	/// transaction's connection, the schema cookie has been bumped. A read
	/// issued through the pool transparently lands on a *different* physical
	/// connection whose prepared-statement / schema cache is stale. SQLite
	/// then returns SQLITE_SCHEMA (code 262, "database schema is locked")
	/// instead of returning the up-to-date row. Routing the existence check
	/// through the same connection that performed the DDL avoids the stale
	/// cache entirely.
	pub async fn table_exists(&mut self, table_name: &str) -> Result<bool> {
		use reinhardt_query::prelude::{
			Alias, Cond, Expr, ExprTrait, MySqlQueryBuilder, PostgresQueryBuilder, Query,
			QueryStatementBuilder, SqliteQueryBuilder,
		};

		match self.db_type {
			DatabaseType::Postgres => {
				// Build an escaped/quoted literal query using reinhardt-query
				// (values are inlined via `to_string(QueryBuilder)`, not bound
				// as parameters), mirroring the introspection emitted by
				// `DatabaseMigrationExecutor` so the routing change here is
				// purely about which connection runs the query.
				let subquery = Query::select()
					.expr(Expr::asterisk())
					.from((Alias::new("information_schema"), Alias::new("tables")))
					.cond_where(
						Cond::all()
							.add(Expr::col(Alias::new("table_schema")).eq("public"))
							.add(Expr::col(Alias::new("table_name")).eq(table_name)),
					)
					.to_owned();

				// Explicitly alias the EXISTS expression so we can look it up
				// by a stable column key regardless of how a given adapter
				// exposes an unnamed expression.
				let query_str = format!(
					"SELECT EXISTS ({}) AS table_exists",
					subquery.to_string(PostgresQueryBuilder)
				);

				match self.fetch_optional(&query_str, vec![]).await? {
					Some(row) => match row.data.get("table_exists") {
						Some(QueryValue::Bool(b)) => Ok(*b),
						_ => Ok(false),
					},
					None => Ok(false),
				}
			}
			DatabaseType::Sqlite => {
				let query = Query::select()
					.column(Alias::new("name"))
					.from(Alias::new("sqlite_master"))
					.cond_where(
						Cond::all()
							.add(Expr::col(Alias::new("type")).eq("table"))
							.add(Expr::col(Alias::new("name")).eq(table_name)),
					)
					.to_owned();

				let query_str = query.to_string(SqliteQueryBuilder);
				let row = self.fetch_optional(&query_str, vec![]).await?;
				Ok(row.is_some())
			}
			DatabaseType::Mysql => {
				// `information_schema.tables` exposes canonical UPPER_CASE
				// column names (e.g. `TABLE_SCHEMA`, `TABLE_NAME`); use them
				// consistently to avoid surprises if identifier-quoting or
				// casing behaviour changes in MySQL configurations.
				let query = Query::select()
					.column(Alias::new("TABLE_NAME"))
					.from((Alias::new("information_schema"), Alias::new("tables")))
					.cond_where(
						Cond::all()
							.add(Expr::col(Alias::new("TABLE_SCHEMA")).eq(Expr::cust("DATABASE()")))
							.add(Expr::col(Alias::new("TABLE_NAME")).eq(table_name)),
					)
					.to_owned();

				let query_str = query.to_string(MySqlQueryBuilder);
				let row = self.fetch_optional(&query_str, vec![]).await?;
				Ok(row.is_some())
			}
		}
	}

	/// Defer SQL execution until finish()
	///
	/// Some operations need to be executed after all other operations
	/// in the migration (e.g., creating indexes on newly created columns).
	///
	/// # Arguments
	///
	/// * `sql` - SQL statement to defer
	pub fn defer(&mut self, sql: String) {
		self.deferred_sql.push(sql);
	}

	/// Finish the schema editing session
	///
	/// Executes any deferred SQL and commits the transaction if atomic.
	///
	/// # Returns
	///
	/// Ok(()) on success
	pub async fn finish(mut self) -> Result<()> {
		// Execute deferred SQL
		for sql in self.deferred_sql.drain(..) {
			if let Some(ref mut tx) = self.executor {
				tx.execute(&sql, vec![]).await?;
			} else {
				self.connection.execute(&sql, vec![]).await?;
			}
		}

		// Commit if in transaction
		if let Some(tx) = self.executor.take() {
			tx.commit().await?;
		}

		Ok(())
	}

	/// Rollback any changes (only effective for transactional DDL databases)
	///
	/// For databases that don't support transactional DDL (MySQL),
	/// this is a no-op as DDL statements have already been implicitly committed.
	pub async fn rollback(mut self) -> Result<()> {
		if let Some(tx) = self.executor.take() {
			tx.rollback().await?;
		}
		Ok(())
	}

	/// Check if this editor is using atomic transactions
	pub fn is_atomic(&self) -> bool {
		self.atomic
	}

	/// Get the database type
	pub fn database_type(&self) -> DatabaseType {
		self.db_type
	}

	/// Get a reference to the underlying connection
	///
	/// This can be used for operations that need direct connection access
	/// outside of the transaction (e.g., checking table existence).
	pub fn connection(&self) -> &DatabaseConnection {
		&self.connection
	}

	/// Disable foreign key checks (SQLite only)
	///
	/// This must be called BEFORE any table recreation operations that might
	/// temporarily break foreign key relationships. Remember to re-enable
	/// foreign keys after the operation completes.
	///
	/// # SQLite Foreign Key Handling
	///
	/// SQLite table recreation temporarily drops the original table, which
	/// can cause foreign key violations. This method disables foreign key
	/// enforcement during the recreation process.
	///
	/// # Returns
	///
	/// Ok(()) if successful, or an error if the operation fails.
	/// Returns Ok(()) immediately for non-SQLite databases.
	#[cfg(feature = "sqlite")]
	pub async fn disable_foreign_keys(&mut self) -> Result<()> {
		if !matches!(self.db_type, DatabaseType::Sqlite) {
			return Ok(());
		}

		tracing::debug!("Disabling SQLite foreign key checks");
		self.execute("PRAGMA foreign_keys = OFF").await?;
		Ok(())
	}

	/// Enable foreign key checks (SQLite only)
	///
	/// This should be called AFTER table recreation operations complete
	/// to restore foreign key enforcement.
	///
	/// # Returns
	///
	/// Ok(()) if successful, or an error if the operation fails.
	/// Returns Ok(()) immediately for non-SQLite databases.
	#[cfg(feature = "sqlite")]
	pub async fn enable_foreign_keys(&mut self) -> Result<()> {
		if !matches!(self.db_type, DatabaseType::Sqlite) {
			return Ok(());
		}

		tracing::debug!("Enabling SQLite foreign key checks");
		self.execute("PRAGMA foreign_keys = ON").await?;
		Ok(())
	}

	/// Check foreign key integrity (SQLite only)
	///
	/// This should be called after table recreation to verify that all
	/// foreign key relationships are valid. If violations are found,
	/// they will be returned as a vector of violation descriptions.
	///
	/// # Returns
	///
	/// A vector of foreign key violation descriptions (empty if no violations).
	/// Returns an empty vector immediately for non-SQLite databases.
	#[cfg(feature = "sqlite")]
	pub async fn check_foreign_key_integrity(&mut self) -> Result<Vec<String>> {
		if !matches!(self.db_type, DatabaseType::Sqlite) {
			return Ok(Vec::new());
		}

		tracing::debug!("Checking SQLite foreign key integrity");

		// PRAGMA foreign_key_check returns rows with:
		// table, rowid, parent_table, fkid
		let sql = "PRAGMA foreign_key_check";
		let rows = if let Some(ref mut tx) = self.executor {
			tx.fetch_all(sql, vec![]).await?
		} else {
			self.connection.fetch_all(sql, vec![]).await?
		};

		let violations: Vec<String> = rows
			.into_iter()
			.map(|row| {
				// PRAGMA foreign_key_check returns: table, rowid, parent, fkid
				let table: String = row.get("table").unwrap_or_default();
				let rowid: i64 = row.get("rowid").unwrap_or_default();
				let parent_table: String = row.get("parent").unwrap_or_default();
				format!(
					"FK violation in '{}' row {} referencing '{}'",
					table, rowid, parent_table
				)
			})
			.collect();

		if !violations.is_empty() {
			tracing::warn!("Foreign key violations found: {:?}", violations);
		}

		Ok(violations)
	}
}

#[cfg(test)]
mod tests {
	use super::*;

	#[test]
	fn test_database_type_transactional_ddl() {
		assert!(DatabaseType::Postgres.supports_transactional_ddl());
		assert!(DatabaseType::Sqlite.supports_transactional_ddl());
		assert!(!DatabaseType::Mysql.supports_transactional_ddl());
	}
}