1pub use sqlx;
2pub use tracing;
3
4pub mod prelude {
5 pub use crate::{
6 Executor, IntoExecutor, Model, ModelHooks, ModelSchema, ModelValidation, Premix,
7 RuntimeProfile, UpdateResult, test_utils::MockDatabase, test_utils::with_test_transaction,
8 };
9}
10use sqlx::{Database, Executor as SqlxExecutor, IntoArguments};
11use std::time::{Duration, Instant};
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum RuntimeProfile {
15 Server,
16 Serverless,
17}
18
19pub struct Premix;
20pub struct RawQuery<'q> {
21 sql: &'q str,
22}
23
24#[doc(hidden)]
25pub fn build_placeholders<DB: SqlDialect>(start_index: usize, count: usize) -> String {
26 let mut out = String::new();
27 for i in 0..count {
28 if i > 0 {
29 out.push_str(", ");
30 }
31 out.push_str(&DB::placeholder(start_index + i));
32 }
33 out
34}
35pub mod migrator;
36pub mod schema;
37pub mod test_utils;
38pub use migrator::{Migration, Migrator};
39pub use schema::{
40 ColumnDiff, ColumnNullabilityDiff, ColumnPrimaryKeyDiff, ColumnTypeDiff, ModelSchema,
41 SchemaColumn, SchemaDiff, SchemaForeignKey, SchemaIndex, SchemaTable,
42 format_schema_diff_summary,
43};
44#[cfg(feature = "postgres")]
45pub use schema::{diff_postgres_schema, introspect_postgres_schema, postgres_migration_sql};
46#[cfg(feature = "sqlite")]
47pub use schema::{diff_sqlite_schema, introspect_sqlite_schema, sqlite_migration_sql};
48pub use test_utils::{MockDatabase, with_test_transaction};
49
50pub trait SqlDialect: Database + Sized + Send + Sync
53where
54 Self::Connection: Send,
55{
56 fn placeholder(n: usize) -> String;
57 fn auto_increment_pk() -> &'static str;
58 fn rows_affected(res: &Self::QueryResult) -> u64;
59 fn last_insert_id(res: &Self::QueryResult) -> i64;
60 fn supports_returning() -> bool {
61 false
62 }
63
64 fn current_timestamp_fn() -> &'static str {
65 "CURRENT_TIMESTAMP"
66 }
67 fn int_type() -> &'static str {
68 "INTEGER"
69 }
70 fn bigint_type() -> &'static str {
71 "BIGINT"
72 }
73 fn text_type() -> &'static str {
74 "TEXT"
75 }
76 fn bool_type() -> &'static str {
77 "BOOLEAN"
78 }
79 fn float_type() -> &'static str {
80 "REAL"
81 }
82 fn blob_type() -> &'static str {
83 "BLOB"
84 }
85}
86
87#[cfg(feature = "sqlite")]
88impl SqlDialect for sqlx::Sqlite {
89 fn placeholder(_n: usize) -> String {
90 "?".to_string()
91 }
92 fn auto_increment_pk() -> &'static str {
93 "INTEGER PRIMARY KEY"
94 }
95 fn bigint_type() -> &'static str {
96 "INTEGER"
97 }
98 fn blob_type() -> &'static str {
99 "BLOB"
100 }
101 fn rows_affected(res: &sqlx::sqlite::SqliteQueryResult) -> u64 {
102 res.rows_affected()
103 }
104 fn last_insert_id(res: &sqlx::sqlite::SqliteQueryResult) -> i64 {
105 res.last_insert_rowid()
106 }
107 fn supports_returning() -> bool {
108 false
109 }
110}
111
112#[cfg(feature = "postgres")]
113impl SqlDialect for sqlx::Postgres {
114 fn placeholder(n: usize) -> String {
115 format!("${}", n)
116 }
117 fn auto_increment_pk() -> &'static str {
118 "SERIAL PRIMARY KEY"
119 }
120 fn int_type() -> &'static str {
121 "INTEGER"
122 }
123 fn bigint_type() -> &'static str {
124 "BIGINT"
125 }
126 fn text_type() -> &'static str {
127 "TEXT"
128 }
129 fn bool_type() -> &'static str {
130 "BOOLEAN"
131 }
132 fn float_type() -> &'static str {
133 "DOUBLE PRECISION"
134 }
135 fn blob_type() -> &'static str {
136 "BYTEA"
137 }
138 fn rows_affected(res: &sqlx::postgres::PgQueryResult) -> u64 {
139 res.rows_affected()
140 }
141 fn last_insert_id(_res: &sqlx::postgres::PgQueryResult) -> i64 {
142 0
143 }
144 fn supports_returning() -> bool {
145 true
146 }
147}
148
149#[cfg(feature = "mysql")]
150impl SqlDialect for sqlx::MySql {
151 fn placeholder(_n: usize) -> String {
152 "?".to_string()
153 }
154 fn auto_increment_pk() -> &'static str {
155 "INTEGER AUTO_INCREMENT PRIMARY KEY"
156 }
157 fn bigint_type() -> &'static str {
158 "BIGINT"
159 }
160 fn blob_type() -> &'static str {
161 "LONGBLOB"
162 }
163 fn rows_affected(res: &sqlx::mysql::MySqlQueryResult) -> u64 {
164 res.rows_affected()
165 }
166 fn last_insert_id(res: &sqlx::mysql::MySqlQueryResult) -> i64 {
167 res.last_insert_id() as i64
168 }
169 fn supports_returning() -> bool {
170 false
171 }
172}
173
174pub enum Executor<'a, DB: Database> {
176 Pool(&'a sqlx::Pool<DB>),
177 Conn(&'a mut DB::Connection),
178}
179
180unsafe impl<'a, DB: Database> Send for Executor<'a, DB> where DB::Connection: Send {}
181unsafe impl<'a, DB: Database> Sync for Executor<'a, DB> where DB::Connection: Sync {}
182
183impl<'a, DB: Database> From<&'a sqlx::Pool<DB>> for Executor<'a, DB> {
184 fn from(pool: &'a sqlx::Pool<DB>) -> Self {
185 Self::Pool(pool)
186 }
187}
188
189impl<'a, DB: Database> From<&'a mut DB::Connection> for Executor<'a, DB> {
190 fn from(conn: &'a mut DB::Connection) -> Self {
191 Self::Conn(conn)
192 }
193}
194
195pub trait IntoExecutor<'a>: Send + 'a {
196 type DB: SqlDialect;
197 fn into_executor(self) -> Executor<'a, Self::DB>;
198}
199
200impl<'a, DB: SqlDialect> IntoExecutor<'a> for &'a sqlx::Pool<DB> {
201 type DB = DB;
202 fn into_executor(self) -> Executor<'a, DB> {
203 Executor::Pool(self)
204 }
205}
206
207#[cfg(feature = "sqlite")]
208impl<'a> IntoExecutor<'a> for &'a mut sqlx::SqliteConnection {
209 type DB = sqlx::Sqlite;
210 fn into_executor(self) -> Executor<'a, Self::DB> {
211 Executor::Conn(self)
212 }
213}
214
215#[cfg(feature = "postgres")]
216impl<'a> IntoExecutor<'a> for &'a mut sqlx::postgres::PgConnection {
217 type DB = sqlx::Postgres;
218 fn into_executor(self) -> Executor<'a, Self::DB> {
219 Executor::Conn(self)
220 }
221}
222
223impl<'a, DB: SqlDialect> IntoExecutor<'a> for Executor<'a, DB> {
224 type DB = DB;
225 fn into_executor(self) -> Executor<'a, DB> {
226 self
227 }
228}
229
230impl<'a, DB: Database> Executor<'a, DB> {
231 pub async fn execute<'q, A>(
232 &mut self,
233 query: sqlx::query::Query<'q, DB, A>,
234 ) -> Result<DB::QueryResult, sqlx::Error>
235 where
236 A: sqlx::IntoArguments<'q, DB> + 'q,
237 DB: SqlDialect,
238 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
239 {
240 match self {
241 Self::Pool(pool) => query.execute(*pool).await,
242 Self::Conn(conn) => query.execute(&mut **conn).await,
243 }
244 }
245
246 pub async fn fetch_all<'q, T, A>(
247 &mut self,
248 query: sqlx::query::QueryAs<'q, DB, T, A>,
249 ) -> Result<Vec<T>, sqlx::Error>
250 where
251 T: for<'r> sqlx::FromRow<'r, DB::Row> + Send + Unpin,
252 A: sqlx::IntoArguments<'q, DB> + 'q,
253 DB: SqlDialect,
254 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
255 {
256 match self {
257 Self::Pool(pool) => query.fetch_all(*pool).await,
258 Self::Conn(conn) => query.fetch_all(&mut **conn).await,
259 }
260 }
261
262 pub async fn fetch_optional<'q, T, A>(
263 &mut self,
264 query: sqlx::query::QueryAs<'q, DB, T, A>,
265 ) -> Result<Option<T>, sqlx::Error>
266 where
267 T: for<'r> sqlx::FromRow<'r, DB::Row> + Send + Unpin,
268 A: sqlx::IntoArguments<'q, DB> + 'q,
269 DB: SqlDialect,
270 for<'c> &'c mut DB::Connection: sqlx::Executor<'c, Database = DB>,
271 {
272 match self {
273 Self::Pool(pool) => query.fetch_optional(*pool).await,
274 Self::Conn(conn) => query.fetch_optional(&mut **conn).await,
275 }
276 }
277}
278
279#[inline(never)]
281fn default_model_hook_result() -> Result<(), sqlx::Error> {
282 Ok(())
283}
284
285pub trait ModelHooks {
286 #[inline(never)]
287 fn before_save(&mut self) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send {
288 async move { default_model_hook_result() }
289 }
290 #[inline(never)]
291 fn after_save(&mut self) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send {
292 async move { default_model_hook_result() }
293 }
294}
295
296#[derive(Debug, PartialEq)]
298pub enum UpdateResult {
299 Success,
300 VersionConflict,
301 NotFound,
302 NotImplemented,
303}
304
305#[derive(Debug, Clone)]
307pub struct ValidationError {
308 pub field: String,
309 pub message: String,
310}
311
312pub trait ModelValidation {
313 fn validate(&self) -> Result<(), Vec<ValidationError>> {
314 Ok(())
315 }
316}
317
318pub trait Model<DB: Database>: Sized + Send + Sync + Unpin
319where
320 DB: SqlDialect,
321 for<'r> Self: sqlx::FromRow<'r, DB::Row>,
322{
323 fn table_name() -> &'static str;
324 fn create_table_sql() -> String;
325 fn list_columns() -> Vec<String>;
326
327 fn save<'a, E>(
329 &'a mut self,
330 executor: E,
331 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
332 where
333 E: IntoExecutor<'a, DB = DB>;
334
335 fn update<'a, E>(
336 &'a mut self,
337 executor: E,
338 ) -> impl std::future::Future<Output = Result<UpdateResult, sqlx::Error>> + Send
339 where
340 E: IntoExecutor<'a, DB = DB>;
341
342 fn delete<'a, E>(
344 &'a mut self,
345 executor: E,
346 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
347 where
348 E: IntoExecutor<'a, DB = DB>;
349 fn has_soft_delete() -> bool;
350 fn sensitive_fields() -> &'static [&'static str] {
351 &[]
352 }
353
354 fn find_by_id<'a, E>(
356 executor: E,
357 id: i32,
358 ) -> impl std::future::Future<Output = Result<Option<Self>, sqlx::Error>> + Send
359 where
360 E: IntoExecutor<'a, DB = DB>;
361
362 fn raw_sql<'q>(
364 sql: &'q str,
365 ) -> sqlx::query::QueryAs<'q, DB, Self, <DB as Database>::Arguments<'q>> {
366 sqlx::query_as::<DB, Self>(sql)
367 }
368
369 #[inline(never)]
370 fn eager_load<'a>(
371 _models: &mut [Self],
372 _relation: &str,
373 _executor: Executor<'a, DB>,
374 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send {
375 async move { default_model_hook_result() }
376 }
377 fn find<'a, E>(executor: E) -> QueryBuilder<'a, Self, DB>
378 where
379 E: IntoExecutor<'a, DB = DB>,
380 {
381 QueryBuilder::new(executor.into_executor())
382 }
383
384 fn find_in_pool(pool: &sqlx::Pool<DB>) -> QueryBuilder<'_, Self, DB> {
386 QueryBuilder::new(Executor::Pool(pool))
387 }
388
389 fn find_in_tx(conn: &mut DB::Connection) -> QueryBuilder<'_, Self, DB> {
390 QueryBuilder::new(Executor::Conn(conn))
391 }
392}
393
394#[doc(hidden)]
395#[derive(Debug, Clone)]
396pub enum BindValue {
397 String(String),
398 I64(i64),
399 F64(f64),
400 Bool(bool),
401 Null,
402}
403
404impl BindValue {
405 fn to_log_string(&self) -> String {
406 match self {
407 BindValue::String(v) => v.clone(),
408 BindValue::I64(v) => v.to_string(),
409 BindValue::F64(v) => v.to_string(),
410 BindValue::Bool(v) => v.to_string(),
411 BindValue::Null => "NULL".to_string(),
412 }
413 }
414}
415
416#[cfg(feature = "metrics")]
417fn record_query_metrics(operation: &str, table: &str, elapsed: Duration) {
418 let elapsed_ms = elapsed.as_secs_f64() * 1000.0;
419 let labels = [
420 ("operation", operation.to_string()),
421 ("table", table.to_string()),
422 ];
423 metrics::histogram!("premix.query.duration_ms", &labels).record(elapsed_ms);
424 metrics::counter!("premix.query.count", &labels).increment(1);
425}
426
427#[cfg(not(feature = "metrics"))]
428fn record_query_metrics(_operation: &str, _table: &str, _elapsed: Duration) {}
429
430impl From<String> for BindValue {
431 fn from(value: String) -> Self {
432 Self::String(value)
433 }
434}
435
436impl From<&str> for BindValue {
437 fn from(value: &str) -> Self {
438 Self::String(value.to_string())
439 }
440}
441
442impl From<i32> for BindValue {
443 fn from(value: i32) -> Self {
444 Self::I64(value as i64)
445 }
446}
447
448impl From<i64> for BindValue {
449 fn from(value: i64) -> Self {
450 Self::I64(value)
451 }
452}
453
454impl From<f64> for BindValue {
455 fn from(value: f64) -> Self {
456 Self::F64(value)
457 }
458}
459
460impl From<bool> for BindValue {
461 fn from(value: bool) -> Self {
462 Self::Bool(value)
463 }
464}
465
466impl From<Option<String>> for BindValue {
467 fn from(value: Option<String>) -> Self {
468 match value {
469 Some(v) => Self::String(v),
470 None => Self::Null,
471 }
472 }
473}
474
475#[derive(Debug, Clone)]
476enum FilterExpr {
477 Raw(String),
478 Compare {
479 column: String,
480 op: String,
481 values: Vec<BindValue>,
482 },
483 NullCheck {
484 column: String,
485 is_null: bool,
486 },
487}
488
489fn bind_value_query<'q, DB>(
490 query: sqlx::query::Query<'q, DB, <DB as Database>::Arguments<'q>>,
491 value: BindValue,
492) -> sqlx::query::Query<'q, DB, <DB as Database>::Arguments<'q>>
493where
494 DB: Database,
495 String: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
496 i64: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
497 f64: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
498 bool: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
499 Option<String>: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
500{
501 match value {
502 BindValue::String(v) => query.bind(v),
503 BindValue::I64(v) => query.bind(v),
504 BindValue::F64(v) => query.bind(v),
505 BindValue::Bool(v) => query.bind(v),
506 BindValue::Null => query.bind(Option::<String>::None),
507 }
508}
509
510fn bind_value_query_as<'q, DB, T>(
511 query: sqlx::query::QueryAs<'q, DB, T, <DB as Database>::Arguments<'q>>,
512 value: BindValue,
513) -> sqlx::query::QueryAs<'q, DB, T, <DB as Database>::Arguments<'q>>
514where
515 DB: Database,
516 String: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
517 i64: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
518 f64: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
519 bool: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
520 Option<String>: sqlx::Encode<'q, DB> + sqlx::Type<DB>,
521{
522 match value {
523 BindValue::String(v) => query.bind(v),
524 BindValue::I64(v) => query.bind(v),
525 BindValue::F64(v) => query.bind(v),
526 BindValue::Bool(v) => query.bind(v),
527 BindValue::Null => query.bind(Option::<String>::None),
528 }
529}
530
531pub struct QueryBuilder<'a, T, DB: Database> {
532 executor: Executor<'a, DB>,
533 filters: Vec<FilterExpr>,
534 limit: Option<i32>,
535 offset: Option<i32>,
536 includes: Vec<String>,
537 include_deleted: bool, allow_unsafe: bool,
539 has_raw_filter: bool,
540 _marker: std::marker::PhantomData<T>,
541}
542
543impl<'a, T, DB> QueryBuilder<'a, T, DB>
544where
545 DB: SqlDialect,
546 T: Model<DB>,
547{
548 pub fn new(executor: Executor<'a, DB>) -> Self {
549 Self {
550 executor,
551 filters: Vec::new(),
552 limit: None,
553 offset: None,
554 includes: Vec::new(),
555 include_deleted: false,
556 allow_unsafe: false,
557 has_raw_filter: false,
558 _marker: std::marker::PhantomData,
559 }
560 }
561
562 pub fn filter(mut self, condition: impl Into<String>) -> Self {
563 self.filters.push(FilterExpr::Raw(condition.into()));
564 self.has_raw_filter = true;
565 self
566 }
567
568 pub fn filter_raw(self, condition: impl Into<String>) -> Self {
569 self.filter(condition)
570 }
571
572 pub fn filter_eq(mut self, column: &str, value: impl Into<BindValue>) -> Self {
573 self.filters.push(FilterExpr::Compare {
574 column: column.to_string(),
575 op: "=".to_string(),
576 values: vec![value.into()],
577 });
578 self
579 }
580
581 pub fn filter_ne(mut self, column: &str, value: impl Into<BindValue>) -> Self {
582 self.filters.push(FilterExpr::Compare {
583 column: column.to_string(),
584 op: "!=".to_string(),
585 values: vec![value.into()],
586 });
587 self
588 }
589
590 pub fn filter_lt(mut self, column: &str, value: impl Into<BindValue>) -> Self {
591 self.filters.push(FilterExpr::Compare {
592 column: column.to_string(),
593 op: "<".to_string(),
594 values: vec![value.into()],
595 });
596 self
597 }
598
599 pub fn filter_lte(mut self, column: &str, value: impl Into<BindValue>) -> Self {
600 self.filters.push(FilterExpr::Compare {
601 column: column.to_string(),
602 op: "<=".to_string(),
603 values: vec![value.into()],
604 });
605 self
606 }
607
608 pub fn filter_gt(mut self, column: &str, value: impl Into<BindValue>) -> Self {
609 self.filters.push(FilterExpr::Compare {
610 column: column.to_string(),
611 op: ">".to_string(),
612 values: vec![value.into()],
613 });
614 self
615 }
616
617 pub fn filter_gte(mut self, column: &str, value: impl Into<BindValue>) -> Self {
618 self.filters.push(FilterExpr::Compare {
619 column: column.to_string(),
620 op: ">=".to_string(),
621 values: vec![value.into()],
622 });
623 self
624 }
625
626 pub fn filter_like(mut self, column: &str, value: impl Into<BindValue>) -> Self {
627 self.filters.push(FilterExpr::Compare {
628 column: column.to_string(),
629 op: "LIKE".to_string(),
630 values: vec![value.into()],
631 });
632 self
633 }
634
635 pub fn filter_is_null(mut self, column: &str) -> Self {
636 self.filters.push(FilterExpr::NullCheck {
637 column: column.to_string(),
638 is_null: true,
639 });
640 self
641 }
642
643 pub fn filter_is_not_null(mut self, column: &str) -> Self {
644 self.filters.push(FilterExpr::NullCheck {
645 column: column.to_string(),
646 is_null: false,
647 });
648 self
649 }
650
651 pub fn filter_in<I, V>(mut self, column: &str, values: I) -> Self
652 where
653 I: IntoIterator<Item = V>,
654 V: Into<BindValue>,
655 {
656 let values = values.into_iter().map(Into::into).collect();
657 self.filters.push(FilterExpr::Compare {
658 column: column.to_string(),
659 op: "IN".to_string(),
660 values,
661 });
662 self
663 }
664
665 fn format_filters_for_log(&self) -> String {
666 let sensitive_fields = T::sensitive_fields();
667 let mut clauses = Vec::new();
668
669 for filter in &self.filters {
670 match filter {
671 FilterExpr::Raw(_) => {
672 clauses.push("RAW(<redacted>)".to_string());
673 }
674 FilterExpr::Compare { column, op, values } => {
675 let is_sensitive = sensitive_fields.iter().any(|&f| f == column);
676 if op == "IN" {
677 if values.is_empty() {
678 clauses.push("1=0".to_string());
679 continue;
680 }
681 let rendered = values
682 .iter()
683 .map(|value| {
684 if is_sensitive {
685 "***".to_string()
686 } else {
687 value.to_log_string()
688 }
689 })
690 .collect::<Vec<_>>()
691 .join(", ");
692 clauses.push(format!("{} IN ({})", column, rendered));
693 } else {
694 let rendered = if is_sensitive {
695 "***".to_string()
696 } else if let Some(value) = values.first() {
697 value.to_log_string()
698 } else {
699 "NULL".to_string()
700 };
701 clauses.push(format!("{} {} {}", column, op, rendered));
702 }
703 }
704 FilterExpr::NullCheck { column, is_null } => {
705 if *is_null {
706 clauses.push(format!("{} IS NULL", column));
707 } else {
708 clauses.push(format!("{} IS NOT NULL", column));
709 }
710 }
711 }
712 }
713
714 if T::has_soft_delete() && !self.include_deleted {
715 clauses.push("deleted_at IS NULL".to_string());
716 }
717
718 clauses.join(" AND ")
719 }
720 pub fn limit(mut self, limit: i32) -> Self {
721 self.limit = Some(limit);
722 self
723 }
724
725 pub fn offset(mut self, offset: i32) -> Self {
726 self.offset = Some(offset);
727 self
728 }
729
730 pub fn include(mut self, relation: impl Into<String>) -> Self {
731 self.includes.push(relation.into());
732 self
733 }
734
735 pub fn with_deleted(mut self) -> Self {
737 self.include_deleted = true;
738 self
739 }
740
741 pub fn allow_unsafe(mut self) -> Self {
742 self.allow_unsafe = true;
743 self
744 }
745
746 pub fn to_sql(&self) -> String {
748 let (where_clause, _) = self.render_where_clause(1);
749 let mut sql = format!("SELECT * FROM {}{}", T::table_name(), where_clause);
750
751 if let Some(limit) = self.limit {
752 sql.push_str(&format!(" LIMIT {}", limit));
753 }
754
755 if let Some(offset) = self.offset {
756 sql.push_str(&format!(" OFFSET {}", offset));
757 }
758
759 sql
760 }
761
762 pub fn to_update_sql(&self, values: &serde_json::Value) -> Result<String, sqlx::Error> {
764 let obj = values.as_object().ok_or_else(|| {
765 sqlx::Error::Protocol("Bulk update requires a JSON object".to_string())
766 })?;
767
768 let mut i = 1;
769 let set_clause = obj
770 .keys()
771 .map(|k| {
772 let p = DB::placeholder(i);
773 i += 1;
774 format!("{} = {}", k, p)
775 })
776 .collect::<Vec<_>>()
777 .join(", ");
778
779 let (where_clause, _) = self.render_where_clause(obj.len() + 1);
780 Ok(format!(
781 "UPDATE {} SET {}{}",
782 T::table_name(),
783 set_clause,
784 where_clause
785 ))
786 }
787
788 pub fn to_delete_sql(&self) -> String {
790 let (where_clause, _) = self.render_where_clause(1);
791 if T::has_soft_delete() {
792 format!(
793 "UPDATE {} SET deleted_at = {}{}",
794 T::table_name(),
795 DB::current_timestamp_fn(),
796 where_clause
797 )
798 } else {
799 format!("DELETE FROM {}{}", T::table_name(), where_clause)
800 }
801 }
802
803 fn render_where_clause(&self, start_index: usize) -> (String, Vec<BindValue>) {
804 let mut clauses = Vec::new();
805 let mut binds = Vec::new();
806 let mut idx = start_index;
807
808 for filter in &self.filters {
809 match filter {
810 FilterExpr::Raw(condition) => {
811 clauses.push(condition.clone());
812 }
813 FilterExpr::Compare { column, op, values } => {
814 if op == "IN" {
815 if values.is_empty() {
816 clauses.push("1=0".to_string());
817 continue;
818 }
819 let placeholders = values
820 .iter()
821 .map(|_| {
822 let p = DB::placeholder(idx);
823 idx += 1;
824 p
825 })
826 .collect::<Vec<_>>()
827 .join(", ");
828 clauses.push(format!("{} IN ({})", column, placeholders));
829 binds.extend(values.iter().cloned());
830 } else {
831 let placeholder = DB::placeholder(idx);
832 idx += 1;
833 clauses.push(format!("{} {} {}", column, op, placeholder));
834 binds.extend(values.iter().cloned());
835 }
836 }
837 FilterExpr::NullCheck { column, is_null } => {
838 if *is_null {
839 clauses.push(format!("{} IS NULL", column));
840 } else {
841 clauses.push(format!("{} IS NOT NULL", column));
842 }
843 }
844 }
845 }
846
847 if T::has_soft_delete() && !self.include_deleted {
848 clauses.push("deleted_at IS NULL".to_string());
849 }
850
851 if clauses.is_empty() {
852 ("".to_string(), binds)
853 } else {
854 (format!(" WHERE {}", clauses.join(" AND ")), binds)
855 }
856 }
857}
858
859impl<'a, T, DB> QueryBuilder<'a, T, DB>
860where
861 DB: SqlDialect,
862 T: Model<DB>,
863 for<'q> <DB as Database>::Arguments<'q>: IntoArguments<'q, DB>,
864 for<'c> &'c mut <DB as Database>::Connection: SqlxExecutor<'c, Database = DB>,
865 for<'c> &'c str: sqlx::ColumnIndex<DB::Row>,
866 DB::Connection: Send,
867 T: Send,
868 String: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
869 i64: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
870 f64: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
871 bool: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
872 Option<String>: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
873{
874 fn ensure_safe_filters(&self) -> Result<(), sqlx::Error> {
875 if self.has_raw_filter && !self.allow_unsafe {
876 return Err(sqlx::Error::Protocol(
877 "Refusing raw filter without allow_unsafe".to_string(),
878 ));
879 }
880 Ok(())
881 }
882
883 pub async fn all(mut self) -> Result<Vec<T>, sqlx::Error> {
884 self.ensure_safe_filters()?;
885 let (where_clause, where_binds) = self.render_where_clause(1);
886 let mut sql = format!("SELECT * FROM {}{}", T::table_name(), where_clause);
887
888 if let Some(limit) = self.limit {
889 sql.push_str(&format!(" LIMIT {}", limit));
890 }
891
892 if let Some(offset) = self.offset {
893 sql.push_str(&format!(" OFFSET {}", offset));
894 }
895
896 tracing::debug!(
897 operation = "select",
898 sql = %sql,
899 filters = %self.format_filters_for_log(),
900 "premix query"
901 );
902 let start = Instant::now();
903 let mut results: Vec<T> = match &mut self.executor {
904 Executor::Pool(pool) => {
905 let mut query = sqlx::query_as::<DB, T>(&sql);
906 for bind in where_binds {
907 query = bind_value_query_as(query, bind);
908 }
909 query.fetch_all(*pool).await?
910 }
911 Executor::Conn(conn) => {
912 let mut query = sqlx::query_as::<DB, T>(&sql);
913 for bind in where_binds {
914 query = bind_value_query_as(query, bind);
915 }
916 query.fetch_all(&mut **conn).await?
917 }
918 };
919 record_query_metrics("select", T::table_name(), start.elapsed());
920
921 for relation in self.includes {
922 match &mut self.executor {
923 Executor::Pool(pool) => {
924 T::eager_load(&mut results, &relation, Executor::Pool(*pool)).await?;
925 }
926 Executor::Conn(conn) => {
927 T::eager_load(&mut results, &relation, Executor::Conn(&mut **conn)).await?;
928 }
929 }
930 }
931
932 Ok(results)
933 }
934
935 #[inline(never)]
937 pub async fn update(mut self, values: serde_json::Value) -> Result<u64, sqlx::Error>
938 where
939 String: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
940 i64: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
941 f64: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
942 bool: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
943 Option<String>: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
944 {
945 self.ensure_safe_filters()?;
946 if self.filters.is_empty() && !self.allow_unsafe {
947 return Err(sqlx::Error::Protocol(
948 "Refusing bulk update without filters".to_string(),
949 ));
950 }
951 let obj = values.as_object().ok_or_else(|| {
952 sqlx::Error::Protocol("Bulk update requires a JSON object".to_string())
953 })?;
954
955 let mut i = 1;
956 let set_clause = obj
957 .keys()
958 .map(|k| {
959 let p = DB::placeholder(i);
960 i += 1;
961 format!("{} = {}", k, p)
962 })
963 .collect::<Vec<_>>()
964 .join(", ");
965
966 let (where_clause, where_binds) = self.render_where_clause(obj.len() + 1);
967 let sql = format!(
968 "UPDATE {} SET {}{}",
969 T::table_name(),
970 set_clause,
971 where_clause
972 );
973
974 tracing::debug!(
975 operation = "bulk_update",
976 sql = %sql,
977 filters = %self.format_filters_for_log(),
978 "premix query"
979 );
980 let mut query = sqlx::query::<DB>(&sql);
981 for val in obj.values() {
982 match val {
983 serde_json::Value::String(s) => query = query.bind(s.clone()),
984 serde_json::Value::Number(n) => {
985 if let Some(v) = n.as_i64() {
986 query = query.bind(v);
987 } else if let Some(v) = n.as_f64() {
988 query = query.bind(v);
989 }
990 }
991 serde_json::Value::Bool(b) => query = query.bind(*b),
992 serde_json::Value::Null => query = query.bind(Option::<String>::None),
993 _ => {
994 return Err(sqlx::Error::Protocol(
995 "Unsupported type in bulk update".to_string(),
996 ));
997 }
998 }
999 }
1000 for bind in where_binds {
1001 query = bind_value_query(query, bind);
1002 }
1003
1004 let start = Instant::now();
1005 let result = match &mut self.executor {
1006 Executor::Pool(pool) => {
1007 let res = query.execute(*pool).await?;
1008 Ok(DB::rows_affected(&res))
1009 }
1010 Executor::Conn(conn) => {
1011 let res = query.execute(&mut **conn).await?;
1012 Ok(DB::rows_affected(&res))
1013 }
1014 };
1015 record_query_metrics("bulk_update", T::table_name(), start.elapsed());
1016 result
1017 }
1018
1019 pub async fn update_all(self, values: serde_json::Value) -> Result<u64, sqlx::Error>
1020 where
1021 String: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
1022 i64: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
1023 f64: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
1024 bool: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
1025 Option<String>: for<'q> sqlx::Encode<'q, DB> + sqlx::Type<DB>,
1026 {
1027 self.update(values).await
1028 }
1029
1030 pub async fn delete(mut self) -> Result<u64, sqlx::Error> {
1031 self.ensure_safe_filters()?;
1032 if self.filters.is_empty() && !self.allow_unsafe {
1033 return Err(sqlx::Error::Protocol(
1034 "Refusing bulk delete without filters".to_string(),
1035 ));
1036 }
1037 let (where_clause, where_binds) = self.render_where_clause(1);
1038 let sql = if T::has_soft_delete() {
1039 format!(
1040 "UPDATE {} SET deleted_at = {}{}",
1041 T::table_name(),
1042 DB::current_timestamp_fn(),
1043 where_clause
1044 )
1045 } else {
1046 format!("DELETE FROM {}{}", T::table_name(), where_clause)
1047 };
1048
1049 tracing::debug!(
1050 operation = "bulk_delete",
1051 sql = %sql,
1052 filters = %self.format_filters_for_log(),
1053 "premix query"
1054 );
1055 let start = Instant::now();
1056 let result = match &mut self.executor {
1057 Executor::Pool(pool) => {
1058 let mut query = sqlx::query::<DB>(&sql);
1059 for bind in where_binds {
1060 query = bind_value_query(query, bind);
1061 }
1062 let res = query.execute(*pool).await?;
1063 Ok(DB::rows_affected(&res))
1064 }
1065 Executor::Conn(conn) => {
1066 let mut query = sqlx::query::<DB>(&sql);
1067 for bind in where_binds {
1068 query = bind_value_query(query, bind);
1069 }
1070 let res = query.execute(&mut **conn).await?;
1071 Ok(DB::rows_affected(&res))
1072 }
1073 };
1074 record_query_metrics("bulk_delete", T::table_name(), start.elapsed());
1075 result
1076 }
1077
1078 pub async fn delete_all(self) -> Result<u64, sqlx::Error> {
1079 self.delete().await
1080 }
1081}
1082
1083impl Premix {
1084 pub fn detect_runtime_profile() -> RuntimeProfile {
1085 if let Ok(value) = std::env::var("PREMIX_ENV") {
1086 let value = value.to_ascii_lowercase();
1087 if value.contains("serverless") || value.contains("lambda") || value.contains("edge") {
1088 return RuntimeProfile::Serverless;
1089 }
1090 if value.contains("server") || value.contains("prod") || value.contains("production") {
1091 return RuntimeProfile::Server;
1092 }
1093 }
1094
1095 if std::env::var_os("AWS_LAMBDA_FUNCTION_NAME").is_some()
1096 || std::env::var_os("LAMBDA_TASK_ROOT").is_some()
1097 || std::env::var_os("K_SERVICE").is_some()
1098 || std::env::var_os("VERCEL").is_some()
1099 {
1100 return RuntimeProfile::Serverless;
1101 }
1102
1103 RuntimeProfile::Server
1104 }
1105
1106 #[cfg(feature = "sqlite")]
1107 pub fn sqlite_pool_options(profile: RuntimeProfile) -> sqlx::sqlite::SqlitePoolOptions {
1108 match profile {
1109 RuntimeProfile::Server => sqlx::sqlite::SqlitePoolOptions::new()
1110 .max_connections(10)
1111 .min_connections(1)
1112 .acquire_timeout(Duration::from_secs(30))
1113 .idle_timeout(Some(Duration::from_secs(10 * 60)))
1114 .max_lifetime(Some(Duration::from_secs(30 * 60))),
1115 RuntimeProfile::Serverless => sqlx::sqlite::SqlitePoolOptions::new()
1116 .max_connections(2)
1117 .min_connections(0)
1118 .acquire_timeout(Duration::from_secs(10))
1119 .idle_timeout(Some(Duration::from_secs(30)))
1120 .max_lifetime(Some(Duration::from_secs(5 * 60))),
1121 }
1122 }
1123
1124 #[cfg(feature = "postgres")]
1125 pub fn postgres_pool_options(profile: RuntimeProfile) -> sqlx::postgres::PgPoolOptions {
1126 match profile {
1127 RuntimeProfile::Server => sqlx::postgres::PgPoolOptions::new()
1128 .max_connections(10)
1129 .min_connections(1)
1130 .acquire_timeout(Duration::from_secs(30))
1131 .idle_timeout(Some(Duration::from_secs(10 * 60)))
1132 .max_lifetime(Some(Duration::from_secs(30 * 60))),
1133 RuntimeProfile::Serverless => sqlx::postgres::PgPoolOptions::new()
1134 .max_connections(2)
1135 .min_connections(0)
1136 .acquire_timeout(Duration::from_secs(10))
1137 .idle_timeout(Some(Duration::from_secs(30)))
1138 .max_lifetime(Some(Duration::from_secs(5 * 60))),
1139 }
1140 }
1141
1142 #[cfg(feature = "sqlite")]
1143 pub async fn smart_sqlite_pool(database_url: &str) -> Result<sqlx::SqlitePool, sqlx::Error> {
1144 Self::sqlite_pool_options(Self::detect_runtime_profile())
1145 .connect(database_url)
1146 .await
1147 }
1148
1149 #[cfg(feature = "sqlite")]
1150 pub async fn smart_sqlite_pool_with_profile(
1151 database_url: &str,
1152 profile: RuntimeProfile,
1153 ) -> Result<sqlx::SqlitePool, sqlx::Error> {
1154 Self::sqlite_pool_options(profile)
1155 .connect(database_url)
1156 .await
1157 }
1158
1159 #[cfg(feature = "postgres")]
1160 pub async fn smart_postgres_pool(database_url: &str) -> Result<sqlx::PgPool, sqlx::Error> {
1161 Self::postgres_pool_options(Self::detect_runtime_profile())
1162 .connect(database_url)
1163 .await
1164 }
1165
1166 #[cfg(feature = "postgres")]
1167 pub async fn smart_postgres_pool_with_profile(
1168 database_url: &str,
1169 profile: RuntimeProfile,
1170 ) -> Result<sqlx::PgPool, sqlx::Error> {
1171 Self::postgres_pool_options(profile)
1172 .connect(database_url)
1173 .await
1174 }
1175
1176 pub fn raw<'q>(sql: &'q str) -> RawQuery<'q> {
1177 RawQuery { sql }
1178 }
1179 pub async fn sync<DB, M>(pool: &sqlx::Pool<DB>) -> Result<(), sqlx::Error>
1180 where
1181 DB: SqlDialect,
1182 M: Model<DB>,
1183 for<'q> <DB as Database>::Arguments<'q>: IntoArguments<'q, DB>,
1184 for<'c> &'c mut <DB as Database>::Connection: SqlxExecutor<'c, Database = DB>,
1185 for<'c> &'c str: sqlx::ColumnIndex<DB::Row>,
1186 {
1187 let sql = M::create_table_sql();
1188 tracing::debug!(operation = "sync", sql = %sql, "premix query");
1189 sqlx::query::<DB>(&sql).execute(pool).await?;
1190 Ok(())
1191 }
1192}
1193
1194impl<'q> RawQuery<'q> {
1195 pub fn fetch_as<DB, T>(self) -> sqlx::query::QueryAs<'q, DB, T, <DB as Database>::Arguments<'q>>
1196 where
1197 DB: Database,
1198 for<'r> T: sqlx::FromRow<'r, DB::Row>,
1199 {
1200 sqlx::query_as::<DB, T>(self.sql)
1201 }
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206 use sqlx::{Sqlite, SqlitePool, sqlite::SqliteRow};
1207 use std::time::Duration;
1208
1209 use super::*;
1210
1211 #[derive(Debug)]
1212 struct SoftDeleteModel {
1213 id: i32,
1214 status: String,
1215 deleted_at: Option<String>,
1216 }
1217
1218 impl ModelHooks for SoftDeleteModel {}
1219
1220 impl ModelValidation for SoftDeleteModel {}
1221
1222 struct HookDummy;
1223
1224 impl ModelHooks for HookDummy {}
1225
1226 #[derive(Debug)]
1227 struct HardDeleteModel {
1228 id: i32,
1229 }
1230
1231 #[derive(Debug, sqlx::FromRow)]
1232 struct DbModel {
1233 id: i32,
1234 status: String,
1235 deleted_at: Option<String>,
1236 }
1237
1238 #[derive(Debug, sqlx::FromRow)]
1239 struct DbHardModel {
1240 id: i32,
1241 status: String,
1242 }
1243
1244 #[derive(Debug, sqlx::FromRow)]
1245 struct SyncModel {
1246 id: i64,
1247 name: String,
1248 }
1249
1250 #[cfg(feature = "postgres")]
1251 const PG_TABLE: &str = "pg_core_items";
1252
1253 #[cfg(feature = "postgres")]
1254 #[derive(Debug, sqlx::FromRow)]
1255 #[allow(dead_code)]
1256 struct PgModel {
1257 id: i32,
1258 name: String,
1259 }
1260
1261 #[cfg(feature = "postgres")]
1262 impl Model<sqlx::Postgres> for PgModel {
1263 fn table_name() -> &'static str {
1264 PG_TABLE
1265 }
1266 fn create_table_sql() -> String {
1267 format!(
1268 "CREATE TABLE IF NOT EXISTS {} (id SERIAL PRIMARY KEY, name TEXT)",
1269 PG_TABLE
1270 )
1271 }
1272 fn list_columns() -> Vec<String> {
1273 vec!["id".into(), "name".into()]
1274 }
1275 fn save<'a, E>(
1276 &'a mut self,
1277 _executor: E,
1278 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1279 where
1280 E: IntoExecutor<'a, DB = sqlx::Postgres>,
1281 {
1282 async move { Ok(()) }
1283 }
1284 fn update<'a, E>(
1285 &'a mut self,
1286 _executor: E,
1287 ) -> impl std::future::Future<Output = Result<UpdateResult, sqlx::Error>> + Send
1288 where
1289 E: IntoExecutor<'a, DB = sqlx::Postgres>,
1290 {
1291 async move { Ok(UpdateResult::NotImplemented) }
1292 }
1293 fn delete<'a, E>(
1294 &'a mut self,
1295 _executor: E,
1296 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1297 where
1298 E: IntoExecutor<'a, DB = sqlx::Postgres>,
1299 {
1300 async move { Ok(()) }
1301 }
1302 fn has_soft_delete() -> bool {
1303 false
1304 }
1305 fn find_by_id<'a, E>(
1306 _executor: E,
1307 _id: i32,
1308 ) -> impl std::future::Future<Output = Result<Option<Self>, sqlx::Error>> + Send
1309 where
1310 E: IntoExecutor<'a, DB = sqlx::Postgres>,
1311 {
1312 async move { Ok(None) }
1313 }
1314 }
1315
1316 #[cfg(feature = "postgres")]
1317 fn pg_url() -> String {
1318 std::env::var("DATABASE_URL").unwrap_or_else(|_| {
1319 "postgres://postgres:admin123@localhost:5432/premix_bench".to_string()
1320 })
1321 }
1322
1323 impl<'r> sqlx::FromRow<'r, SqliteRow> for SoftDeleteModel {
1324 fn from_row(_row: &SqliteRow) -> Result<Self, sqlx::Error> {
1325 Err(sqlx::Error::RowNotFound)
1326 }
1327 }
1328
1329 impl<'r> sqlx::FromRow<'r, SqliteRow> for HardDeleteModel {
1330 fn from_row(_row: &SqliteRow) -> Result<Self, sqlx::Error> {
1331 Err(sqlx::Error::RowNotFound)
1332 }
1333 }
1334
1335 impl Model<Sqlite> for DbModel {
1336 fn table_name() -> &'static str {
1337 "db_users"
1338 }
1339 fn create_table_sql() -> String {
1340 String::new()
1341 }
1342 fn list_columns() -> Vec<String> {
1343 vec!["id".into(), "status".into(), "deleted_at".into()]
1344 }
1345 fn save<'a, E>(
1346 &'a mut self,
1347 _executor: E,
1348 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1349 where
1350 E: IntoExecutor<'a, DB = Sqlite>,
1351 {
1352 async move { Ok(()) }
1353 }
1354 fn update<'a, E>(
1355 &'a mut self,
1356 _executor: E,
1357 ) -> impl std::future::Future<Output = Result<UpdateResult, sqlx::Error>> + Send
1358 where
1359 E: IntoExecutor<'a, DB = Sqlite>,
1360 {
1361 async move { Ok(UpdateResult::NotImplemented) }
1362 }
1363 fn delete<'a, E>(
1364 &'a mut self,
1365 _executor: E,
1366 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1367 where
1368 E: IntoExecutor<'a, DB = Sqlite>,
1369 {
1370 async move { Ok(()) }
1371 }
1372 fn has_soft_delete() -> bool {
1373 true
1374 }
1375 fn sensitive_fields() -> &'static [&'static str] {
1376 &["status"]
1377 }
1378 fn find_by_id<'a, E>(
1379 _executor: E,
1380 _id: i32,
1381 ) -> impl std::future::Future<Output = Result<Option<Self>, sqlx::Error>> + Send
1382 where
1383 E: IntoExecutor<'a, DB = Sqlite>,
1384 {
1385 async move { Ok(None) }
1386 }
1387 }
1388
1389 impl Model<Sqlite> for DbHardModel {
1390 fn table_name() -> &'static str {
1391 "db_hard_users"
1392 }
1393 fn create_table_sql() -> String {
1394 String::new()
1395 }
1396 fn list_columns() -> Vec<String> {
1397 vec!["id".into(), "status".into()]
1398 }
1399 fn save<'a, E>(
1400 &'a mut self,
1401 _executor: E,
1402 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1403 where
1404 E: IntoExecutor<'a, DB = Sqlite>,
1405 {
1406 async move { Ok(()) }
1407 }
1408 fn update<'a, E>(
1409 &'a mut self,
1410 _executor: E,
1411 ) -> impl std::future::Future<Output = Result<UpdateResult, sqlx::Error>> + Send
1412 where
1413 E: IntoExecutor<'a, DB = Sqlite>,
1414 {
1415 async move { Ok(UpdateResult::NotImplemented) }
1416 }
1417 fn delete<'a, E>(
1418 &'a mut self,
1419 _executor: E,
1420 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1421 where
1422 E: IntoExecutor<'a, DB = Sqlite>,
1423 {
1424 async move { Ok(()) }
1425 }
1426 fn has_soft_delete() -> bool {
1427 false
1428 }
1429 fn find_by_id<'a, E>(
1430 _executor: E,
1431 _id: i32,
1432 ) -> impl std::future::Future<Output = Result<Option<Self>, sqlx::Error>> + Send
1433 where
1434 E: IntoExecutor<'a, DB = Sqlite>,
1435 {
1436 async move { Ok(None) }
1437 }
1438 }
1439
1440 impl Model<Sqlite> for SyncModel {
1441 fn table_name() -> &'static str {
1442 "sync_items"
1443 }
1444 fn create_table_sql() -> String {
1445 "CREATE TABLE IF NOT EXISTS sync_items (id INTEGER PRIMARY KEY, name TEXT);".to_string()
1446 }
1447 fn list_columns() -> Vec<String> {
1448 vec!["id".into(), "name".into()]
1449 }
1450 fn save<'a, E>(
1451 &'a mut self,
1452 _executor: E,
1453 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1454 where
1455 E: IntoExecutor<'a, DB = Sqlite>,
1456 {
1457 async move { Ok(()) }
1458 }
1459 fn update<'a, E>(
1460 &'a mut self,
1461 _executor: E,
1462 ) -> impl std::future::Future<Output = Result<UpdateResult, sqlx::Error>> + Send
1463 where
1464 E: IntoExecutor<'a, DB = Sqlite>,
1465 {
1466 async move { Ok(UpdateResult::NotImplemented) }
1467 }
1468 fn delete<'a, E>(
1469 &'a mut self,
1470 _executor: E,
1471 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1472 where
1473 E: IntoExecutor<'a, DB = Sqlite>,
1474 {
1475 async move { Ok(()) }
1476 }
1477 fn has_soft_delete() -> bool {
1478 false
1479 }
1480 fn find_by_id<'a, E>(
1481 _executor: E,
1482 _id: i32,
1483 ) -> impl std::future::Future<Output = Result<Option<Self>, sqlx::Error>> + Send
1484 where
1485 E: IntoExecutor<'a, DB = Sqlite>,
1486 {
1487 async move { Ok(None) }
1488 }
1489 }
1490
1491 impl Model<Sqlite> for SoftDeleteModel {
1492 fn table_name() -> &'static str {
1493 "users"
1494 }
1495 fn create_table_sql() -> String {
1496 String::new()
1497 }
1498 fn list_columns() -> Vec<String> {
1499 Vec::new()
1500 }
1501 fn save<'a, E>(
1502 &'a mut self,
1503 _executor: E,
1504 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1505 where
1506 E: IntoExecutor<'a, DB = Sqlite>,
1507 {
1508 async move { Ok(()) }
1509 }
1510 fn update<'a, E>(
1511 &'a mut self,
1512 _executor: E,
1513 ) -> impl std::future::Future<Output = Result<UpdateResult, sqlx::Error>> + Send
1514 where
1515 E: IntoExecutor<'a, DB = Sqlite>,
1516 {
1517 async move { Ok(UpdateResult::NotImplemented) }
1518 }
1519 fn delete<'a, E>(
1520 &'a mut self,
1521 _executor: E,
1522 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1523 where
1524 E: IntoExecutor<'a, DB = Sqlite>,
1525 {
1526 async move { Ok(()) }
1527 }
1528 fn has_soft_delete() -> bool {
1529 true
1530 }
1531 fn find_by_id<'a, E>(
1532 _executor: E,
1533 _id: i32,
1534 ) -> impl std::future::Future<Output = Result<Option<Self>, sqlx::Error>> + Send
1535 where
1536 E: IntoExecutor<'a, DB = Sqlite>,
1537 {
1538 async move { Ok(None) }
1539 }
1540 }
1541
1542 impl Model<Sqlite> for HardDeleteModel {
1543 fn table_name() -> &'static str {
1544 "hard_users"
1545 }
1546 fn create_table_sql() -> String {
1547 String::new()
1548 }
1549 fn list_columns() -> Vec<String> {
1550 Vec::new()
1551 }
1552 fn save<'a, E>(
1553 &'a mut self,
1554 _executor: E,
1555 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1556 where
1557 E: IntoExecutor<'a, DB = Sqlite>,
1558 {
1559 async move { Ok(()) }
1560 }
1561 fn update<'a, E>(
1562 &'a mut self,
1563 _executor: E,
1564 ) -> impl std::future::Future<Output = Result<UpdateResult, sqlx::Error>> + Send
1565 where
1566 E: IntoExecutor<'a, DB = Sqlite>,
1567 {
1568 async move { Ok(UpdateResult::NotImplemented) }
1569 }
1570 fn delete<'a, E>(
1571 &'a mut self,
1572 _executor: E,
1573 ) -> impl std::future::Future<Output = Result<(), sqlx::Error>> + Send
1574 where
1575 E: IntoExecutor<'a, DB = Sqlite>,
1576 {
1577 async move { Ok(()) }
1578 }
1579 fn has_soft_delete() -> bool {
1580 false
1581 }
1582 fn find_by_id<'a, E>(
1583 _executor: E,
1584 _id: i32,
1585 ) -> impl std::future::Future<Output = Result<Option<Self>, sqlx::Error>> + Send
1586 where
1587 E: IntoExecutor<'a, DB = Sqlite>,
1588 {
1589 async move { Ok(None) }
1590 }
1591 }
1592
1593 #[tokio::test]
1594 async fn query_builder_to_sql_includes_soft_delete_filter() {
1595 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1596 let query = SoftDeleteModel::find_in_pool(&pool)
1597 .filter_gt("age", 18)
1598 .limit(10)
1599 .offset(5);
1600 let sql = query.to_sql();
1601 assert!(sql.contains("FROM users"));
1602 assert!(sql.contains("age > ?"));
1603 assert!(sql.contains("deleted_at IS NULL"));
1604 assert!(sql.contains("LIMIT 10"));
1605 assert!(sql.contains("OFFSET 5"));
1606 }
1607
1608 #[tokio::test]
1609 async fn query_builder_to_sql_without_filters_has_no_where_for_hard_delete() {
1610 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1611 let query = HardDeleteModel::find_in_pool(&pool);
1612 let sql = query.to_sql();
1613 assert!(sql.contains("FROM hard_users"));
1614 assert!(!sql.contains(" WHERE "));
1615 }
1616
1617 #[tokio::test]
1618 async fn query_builder_with_deleted_skips_soft_delete_filter() {
1619 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1620 let query = SoftDeleteModel::find_in_pool(&pool)
1621 .filter_gt("age", 18)
1622 .with_deleted();
1623 let sql = query.to_sql();
1624 assert!(sql.contains("age > ?"));
1625 assert!(!sql.contains("deleted_at IS NULL"));
1626 }
1627
1628 #[tokio::test]
1629 async fn query_builder_to_update_sql_includes_fields() {
1630 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1631 let query = SoftDeleteModel::find_in_pool(&pool).filter_eq("status", "inactive");
1632 let sql = query
1633 .to_update_sql(&serde_json::json!({ "status": "active", "age": 1 }))
1634 .unwrap();
1635 assert!(sql.contains("UPDATE users SET"));
1636 assert!(sql.contains("status"));
1637 assert!(sql.contains("age"));
1638 assert!(sql.contains("WHERE"));
1639 }
1640
1641 #[tokio::test]
1642 async fn query_builder_to_update_sql_rejects_non_object() {
1643 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1644 let query = SoftDeleteModel::find_in_pool(&pool);
1645 let err = query.to_update_sql(&serde_json::json!("bad")).unwrap_err();
1646 assert!(
1647 err.to_string()
1648 .contains("Bulk update requires a JSON object")
1649 );
1650 }
1651
1652 #[tokio::test]
1653 async fn query_builder_to_delete_sql_soft_delete() {
1654 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1655 let query = SoftDeleteModel::find_in_pool(&pool).filter_eq("id", 1);
1656 let sql = query.to_delete_sql();
1657 assert!(sql.starts_with("UPDATE users SET deleted_at"));
1658 }
1659
1660 #[tokio::test]
1661 async fn query_builder_to_delete_sql_hard_delete() {
1662 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1663 let query = HardDeleteModel::find_in_pool(&pool).filter_eq("id", 1);
1664 let sql = query.to_delete_sql();
1665 assert!(sql.starts_with("DELETE FROM hard_users"));
1666 }
1667
1668 #[test]
1669 fn model_raw_sql_compiles() {
1670 let _query = SoftDeleteModel::raw_sql("SELECT * FROM users");
1671 }
1672
1673 #[tokio::test]
1674 async fn premix_raw_fetch_as_maps_struct() {
1675 #[derive(Debug, sqlx::FromRow)]
1676 struct ReportRow {
1677 count: i64,
1678 }
1679
1680 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1681 sqlx::query("CREATE TABLE report_items (id INTEGER PRIMARY KEY);")
1682 .execute(&pool)
1683 .await
1684 .unwrap();
1685 sqlx::query("INSERT INTO report_items DEFAULT VALUES;")
1686 .execute(&pool)
1687 .await
1688 .unwrap();
1689
1690 let row = Premix::raw("SELECT COUNT(*) as count FROM report_items")
1691 .fetch_as::<Sqlite, ReportRow>()
1692 .fetch_one(&pool)
1693 .await
1694 .unwrap();
1695 assert_eq!(row.count, 1);
1696 }
1697
1698 #[cfg(feature = "sqlite")]
1699 #[test]
1700 fn smart_sqlite_pool_options_serverless() {
1701 let opts = Premix::sqlite_pool_options(RuntimeProfile::Serverless);
1702 assert_eq!(opts.get_max_connections(), 2);
1703 assert_eq!(opts.get_min_connections(), 0);
1704 assert_eq!(opts.get_acquire_timeout(), Duration::from_secs(10));
1705 assert_eq!(opts.get_idle_timeout(), Some(Duration::from_secs(30)));
1706 assert_eq!(opts.get_max_lifetime(), Some(Duration::from_secs(5 * 60)));
1707 }
1708
1709 #[cfg(feature = "postgres")]
1710 #[test]
1711 fn smart_postgres_pool_options_server() {
1712 let opts = Premix::postgres_pool_options(RuntimeProfile::Server);
1713 assert_eq!(opts.get_max_connections(), 10);
1714 assert_eq!(opts.get_min_connections(), 1);
1715 assert_eq!(opts.get_acquire_timeout(), Duration::from_secs(30));
1716 assert_eq!(opts.get_idle_timeout(), Some(Duration::from_secs(10 * 60)));
1717 assert_eq!(opts.get_max_lifetime(), Some(Duration::from_secs(30 * 60)));
1718 }
1719
1720 #[test]
1721 fn sqlite_placeholder_uses_question_mark() {
1722 assert_eq!(Sqlite::placeholder(1), "?");
1723 assert_eq!(Sqlite::placeholder(5), "?");
1724 }
1725
1726 #[test]
1727 fn sqlite_timestamp_fn_is_constant() {
1728 assert_eq!(Sqlite::current_timestamp_fn(), "CURRENT_TIMESTAMP");
1729 }
1730
1731 #[test]
1732 fn sqlite_type_helpers_are_static() {
1733 assert_eq!(Sqlite::int_type(), "INTEGER");
1734 assert_eq!(Sqlite::text_type(), "TEXT");
1735 assert_eq!(Sqlite::bool_type(), "BOOLEAN");
1736 assert_eq!(Sqlite::float_type(), "REAL");
1737 }
1738
1739 #[test]
1740 fn sqlite_auto_increment_pk_is_integer() {
1741 assert!(Sqlite::auto_increment_pk().contains("INTEGER"));
1742 }
1743
1744 #[tokio::test]
1745 async fn executor_execute_and_fetch() {
1746 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1747 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT);")
1748 .execute(&pool)
1749 .await
1750 .unwrap();
1751
1752 let mut executor = Executor::Pool(&pool);
1753 executor
1754 .execute(sqlx::query("INSERT INTO items (name) VALUES ('a');"))
1755 .await
1756 .unwrap();
1757
1758 let mut executor = Executor::Pool(&pool);
1759 let row = executor
1760 .fetch_optional(sqlx::query_as::<Sqlite, (i64, String)>(
1761 "SELECT id, name FROM items WHERE name = 'a'",
1762 ))
1763 .await
1764 .unwrap();
1765 let (id, name) = row.unwrap();
1766 assert_eq!(name, "a");
1767 assert!(id > 0);
1768
1769 let mut executor = Executor::Pool(&pool);
1770 let rows = executor
1771 .fetch_all(sqlx::query_as::<Sqlite, (i64, String)>(
1772 "SELECT id, name FROM items",
1773 ))
1774 .await
1775 .unwrap();
1776 assert_eq!(rows.len(), 1);
1777 }
1778
1779 #[tokio::test]
1780 async fn executor_execute_and_fetch_with_conn() {
1781 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1782 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT);")
1783 .execute(&pool)
1784 .await
1785 .unwrap();
1786
1787 let mut conn = pool.acquire().await.unwrap();
1788 let mut executor: Executor<'_, Sqlite> = Executor::Conn(&mut *conn);
1789 executor
1790 .execute(sqlx::query("INSERT INTO items (name) VALUES ('b');"))
1791 .await
1792 .unwrap();
1793
1794 let mut executor: Executor<'_, Sqlite> = Executor::Conn(&mut *conn);
1795 let row = executor
1796 .fetch_optional(sqlx::query_as::<Sqlite, (i64, String)>(
1797 "SELECT id, name FROM items WHERE name = 'b'",
1798 ))
1799 .await
1800 .unwrap();
1801 let (id, name) = row.unwrap();
1802 assert_eq!(name, "b");
1803 assert!(id > 0);
1804
1805 let mut executor: Executor<'_, Sqlite> = Executor::Conn(&mut *conn);
1806 let rows = executor
1807 .fetch_all(sqlx::query_as::<Sqlite, (i64, String)>(
1808 "SELECT id, name FROM items",
1809 ))
1810 .await
1811 .unwrap();
1812 assert_eq!(rows.len(), 1);
1813 }
1814
1815 #[tokio::test]
1816 async fn model_find_builds_query() {
1817 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1818 let sql = DbModel::find(&pool).filter_eq("status", "active").to_sql();
1819 assert!(sql.contains("status = ?"));
1820 }
1821
1822 #[tokio::test]
1823 async fn sqlite_last_insert_id_matches_rowid() {
1824 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1825 sqlx::query("CREATE TABLE items (id INTEGER PRIMARY KEY, name TEXT);")
1826 .execute(&pool)
1827 .await
1828 .unwrap();
1829
1830 let mut conn = pool.acquire().await.unwrap();
1831 let res = sqlx::query("INSERT INTO items (name) VALUES ('alpha');")
1832 .execute(&mut *conn)
1833 .await
1834 .unwrap();
1835 let last_id = <Sqlite as SqlDialect>::last_insert_id(&res);
1836
1837 let rowid: i64 = sqlx::query_scalar("SELECT last_insert_rowid()")
1838 .fetch_one(&mut *conn)
1839 .await
1840 .unwrap();
1841 assert_eq!(last_id, rowid);
1842 }
1843
1844 #[tokio::test]
1845 async fn query_builder_update_executes() {
1846 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1847 sqlx::query(
1848 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, flag INTEGER, deleted_at TEXT);",
1849 )
1850 .execute(&pool)
1851 .await
1852 .unwrap();
1853 sqlx::query("INSERT INTO db_users (status) VALUES ('inactive');")
1854 .execute(&pool)
1855 .await
1856 .unwrap();
1857
1858 let updated = DbModel::find_in_pool(&pool)
1859 .filter_eq("status", "inactive")
1860 .update(serde_json::json!({ "status": "active" }))
1861 .await
1862 .unwrap();
1863 assert_eq!(updated, 1);
1864
1865 let count: i64 =
1866 sqlx::query_scalar("SELECT COUNT(*) FROM db_users WHERE status = 'active'")
1867 .fetch_one(&pool)
1868 .await
1869 .unwrap();
1870 assert_eq!(count, 1);
1871 }
1872
1873 #[tokio::test]
1874 async fn query_builder_update_binds_bool_and_null() {
1875 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1876 sqlx::query(
1877 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, flag INTEGER, deleted_at TEXT);",
1878 )
1879 .execute(&pool)
1880 .await
1881 .unwrap();
1882 sqlx::query("INSERT INTO db_users (status) VALUES ('inactive');")
1883 .execute(&pool)
1884 .await
1885 .unwrap();
1886
1887 let updated = DbModel::find_in_pool(&pool)
1888 .filter_eq("id", 1)
1889 .update(serde_json::json!({ "status": "active", "flag": true, "deleted_at": null }))
1890 .await
1891 .unwrap();
1892 assert_eq!(updated, 1);
1893 }
1894
1895 #[tokio::test]
1896 async fn query_builder_update_binds_float() {
1897 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1898 sqlx::query("CREATE TABLE db_users (id INTEGER PRIMARY KEY, ratio REAL, deleted_at TEXT);")
1899 .execute(&pool)
1900 .await
1901 .unwrap();
1902 sqlx::query("INSERT INTO db_users (ratio) VALUES (0.5);")
1903 .execute(&pool)
1904 .await
1905 .unwrap();
1906
1907 let updated = DbModel::find_in_pool(&pool)
1908 .filter_eq("id", 1)
1909 .update(serde_json::json!({ "ratio": 1.75 }))
1910 .await
1911 .unwrap();
1912 assert_eq!(updated, 1);
1913
1914 let ratio: f64 = sqlx::query_scalar("SELECT ratio FROM db_users WHERE id = 1")
1915 .fetch_one(&pool)
1916 .await
1917 .unwrap();
1918 assert_eq!(ratio, 1.75);
1919 }
1920
1921 #[tokio::test]
1922 async fn query_builder_update_rejects_unsupported_type() {
1923 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1924 sqlx::query(
1925 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, deleted_at TEXT);",
1926 )
1927 .execute(&pool)
1928 .await
1929 .unwrap();
1930 sqlx::query("INSERT INTO db_users (status) VALUES ('inactive');")
1931 .execute(&pool)
1932 .await
1933 .unwrap();
1934
1935 let err = DbModel::find_in_pool(&pool)
1936 .filter_eq("id", 1)
1937 .update(serde_json::json!({ "meta": { "a": 1 } }))
1938 .await
1939 .unwrap_err();
1940 assert!(err.to_string().contains("Unsupported type in bulk update"));
1941 }
1942
1943 #[tokio::test]
1944 async fn query_builder_soft_delete_executes() {
1945 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1946 sqlx::query(
1947 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, deleted_at TEXT);",
1948 )
1949 .execute(&pool)
1950 .await
1951 .unwrap();
1952 sqlx::query("INSERT INTO db_users (status) VALUES ('active');")
1953 .execute(&pool)
1954 .await
1955 .unwrap();
1956
1957 let deleted = DbModel::find_in_pool(&pool)
1958 .filter_eq("status", "active")
1959 .delete()
1960 .await
1961 .unwrap();
1962 assert_eq!(deleted, 1);
1963
1964 let count: i64 =
1965 sqlx::query_scalar("SELECT COUNT(*) FROM db_users WHERE deleted_at IS NOT NULL")
1966 .fetch_one(&pool)
1967 .await
1968 .unwrap();
1969 assert_eq!(count, 1);
1970 }
1971
1972 #[tokio::test]
1973 async fn query_builder_hard_delete_executes() {
1974 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
1975 sqlx::query("CREATE TABLE db_hard_users (id INTEGER PRIMARY KEY, status TEXT);")
1976 .execute(&pool)
1977 .await
1978 .unwrap();
1979 sqlx::query("INSERT INTO db_hard_users (status) VALUES ('active');")
1980 .execute(&pool)
1981 .await
1982 .unwrap();
1983
1984 let deleted = DbHardModel::find_in_pool(&pool)
1985 .filter_eq("status", "active")
1986 .delete()
1987 .await
1988 .unwrap();
1989 assert_eq!(deleted, 1);
1990
1991 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM db_hard_users")
1992 .fetch_one(&pool)
1993 .await
1994 .unwrap();
1995 assert_eq!(count, 0);
1996 }
1997
1998 #[tokio::test]
1999 async fn query_builder_all_with_limit_offset() {
2000 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2001 sqlx::query(
2002 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, deleted_at TEXT);",
2003 )
2004 .execute(&pool)
2005 .await
2006 .unwrap();
2007 sqlx::query("INSERT INTO db_users (status) VALUES ('a'), ('b'), ('c');")
2008 .execute(&pool)
2009 .await
2010 .unwrap();
2011
2012 let rows = DbModel::find_in_pool(&pool)
2013 .include("posts")
2014 .limit(1)
2015 .offset(1)
2016 .all()
2017 .await
2018 .unwrap();
2019 assert_eq!(rows.len(), 1);
2020 }
2021
2022 #[tokio::test]
2023 async fn query_builder_filter_eq_uses_placeholders() {
2024 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2025 let sql = DbModel::find_in_pool(&pool)
2026 .filter_eq("status", "active")
2027 .to_sql();
2028 assert!(sql.contains("status = ?"));
2029 }
2030
2031 #[tokio::test]
2032 async fn query_builder_filters_mask_sensitive_fields() {
2033 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2034 let query = DbModel::find_in_pool(&pool)
2035 .filter_eq("status", "active")
2036 .filter_eq("id", 1);
2037 let filters = query.format_filters_for_log();
2038 assert!(filters.contains("status = ***"));
2039 assert!(filters.contains("id = 1"));
2040 }
2041
2042 #[tokio::test]
2043 async fn query_builder_all_excludes_soft_deleted_by_default() {
2044 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2045 sqlx::query(
2046 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, deleted_at TEXT);",
2047 )
2048 .execute(&pool)
2049 .await
2050 .unwrap();
2051 sqlx::query(
2052 "INSERT INTO db_users (status, deleted_at) VALUES ('active', NULL), ('gone', 'x');",
2053 )
2054 .execute(&pool)
2055 .await
2056 .unwrap();
2057
2058 let rows = DbModel::find_in_pool(&pool).all().await.unwrap();
2059 assert_eq!(rows.len(), 1);
2060
2061 let rows = DbModel::find_in_pool(&pool)
2062 .with_deleted()
2063 .all()
2064 .await
2065 .unwrap();
2066 assert_eq!(rows.len(), 2);
2067 }
2068
2069 #[tokio::test]
2070 async fn query_builder_all_in_tx_uses_conn_executor() {
2071 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2072 sqlx::query("CREATE TABLE db_hard_users (id INTEGER PRIMARY KEY, status TEXT);")
2073 .execute(&pool)
2074 .await
2075 .unwrap();
2076 sqlx::query("INSERT INTO db_hard_users (status) VALUES ('active');")
2077 .execute(&pool)
2078 .await
2079 .unwrap();
2080
2081 let mut tx = pool.begin().await.unwrap();
2082 let rows = DbHardModel::find_in_tx(&mut tx).all().await.unwrap();
2083 assert_eq!(rows.len(), 1);
2084 tx.commit().await.unwrap();
2085 }
2086
2087 #[tokio::test]
2088 async fn query_builder_update_in_tx_uses_conn_executor() {
2089 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2090 sqlx::query(
2091 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, deleted_at TEXT);",
2092 )
2093 .execute(&pool)
2094 .await
2095 .unwrap();
2096 sqlx::query("INSERT INTO db_users (status) VALUES ('inactive');")
2097 .execute(&pool)
2098 .await
2099 .unwrap();
2100
2101 let mut tx = pool.begin().await.unwrap();
2102 let updated = DbModel::find_in_tx(&mut tx)
2103 .filter_eq("status", "inactive")
2104 .update(serde_json::json!({ "status": "active" }))
2105 .await
2106 .unwrap();
2107 assert_eq!(updated, 1);
2108 tx.commit().await.unwrap();
2109 }
2110
2111 #[tokio::test]
2112 async fn query_builder_delete_in_tx_uses_conn_executor() {
2113 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2114 sqlx::query("CREATE TABLE db_hard_users (id INTEGER PRIMARY KEY, status TEXT);")
2115 .execute(&pool)
2116 .await
2117 .unwrap();
2118 sqlx::query("INSERT INTO db_hard_users (status) VALUES ('active');")
2119 .execute(&pool)
2120 .await
2121 .unwrap();
2122
2123 let mut tx = pool.begin().await.unwrap();
2124 let deleted = DbHardModel::find_in_tx(&mut tx)
2125 .filter_eq("status", "active")
2126 .delete()
2127 .await
2128 .unwrap();
2129 assert_eq!(deleted, 1);
2130 tx.commit().await.unwrap();
2131 }
2132
2133 #[tokio::test]
2134 async fn query_builder_update_without_filters_is_rejected() {
2135 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2136 let err = DbModel::find_in_pool(&pool)
2137 .update(serde_json::json!({ "status": "active" }))
2138 .await
2139 .unwrap_err();
2140 assert!(
2141 err.to_string()
2142 .contains("Refusing bulk update without filters")
2143 );
2144 }
2145
2146 #[tokio::test]
2147 async fn query_builder_update_all_matches_update() {
2148 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2149 sqlx::query(
2150 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, deleted_at TEXT);",
2151 )
2152 .execute(&pool)
2153 .await
2154 .unwrap();
2155 sqlx::query("INSERT INTO db_users (status) VALUES ('inactive');")
2156 .execute(&pool)
2157 .await
2158 .unwrap();
2159
2160 let updated = DbModel::find_in_pool(&pool)
2161 .filter_eq("status", "inactive")
2162 .update_all(serde_json::json!({ "status": "active" }))
2163 .await
2164 .unwrap();
2165 assert_eq!(updated, 1);
2166 }
2167
2168 #[tokio::test]
2169 async fn query_builder_delete_without_filters_is_rejected() {
2170 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2171 let err = DbHardModel::find_in_pool(&pool).delete().await.unwrap_err();
2172 assert!(
2173 err.to_string()
2174 .contains("Refusing bulk delete without filters")
2175 );
2176 }
2177
2178 #[tokio::test]
2179 async fn query_builder_delete_all_without_filters_is_rejected() {
2180 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2181 let err = DbHardModel::find_in_pool(&pool)
2182 .delete_all()
2183 .await
2184 .unwrap_err();
2185 assert!(
2186 err.to_string()
2187 .contains("Refusing bulk delete without filters")
2188 );
2189 }
2190
2191 #[tokio::test]
2192 async fn query_builder_delete_all_matches_delete() {
2193 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2194 sqlx::query("CREATE TABLE db_hard_users (id INTEGER PRIMARY KEY, status TEXT);")
2195 .execute(&pool)
2196 .await
2197 .unwrap();
2198 sqlx::query("INSERT INTO db_hard_users (status) VALUES ('active');")
2199 .execute(&pool)
2200 .await
2201 .unwrap();
2202
2203 let deleted = DbHardModel::find_in_pool(&pool)
2204 .filter_eq("status", "active")
2205 .delete_all()
2206 .await
2207 .unwrap();
2208 assert_eq!(deleted, 1);
2209 }
2210
2211 #[tokio::test]
2212 async fn query_builder_delete_without_filters_allows_unsafe() {
2213 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2214 sqlx::query("CREATE TABLE db_hard_users (id INTEGER PRIMARY KEY, status TEXT);")
2215 .execute(&pool)
2216 .await
2217 .unwrap();
2218 sqlx::query("INSERT INTO db_hard_users (status) VALUES ('active');")
2219 .execute(&pool)
2220 .await
2221 .unwrap();
2222
2223 let deleted = DbHardModel::find_in_pool(&pool)
2224 .allow_unsafe()
2225 .delete()
2226 .await
2227 .unwrap();
2228 assert_eq!(deleted, 1);
2229 }
2230
2231 #[tokio::test]
2232 async fn query_builder_delete_all_without_filters_allows_unsafe() {
2233 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2234 sqlx::query("CREATE TABLE db_hard_users (id INTEGER PRIMARY KEY, status TEXT);")
2235 .execute(&pool)
2236 .await
2237 .unwrap();
2238 sqlx::query("INSERT INTO db_hard_users (status) VALUES ('active');")
2239 .execute(&pool)
2240 .await
2241 .unwrap();
2242
2243 let deleted = DbHardModel::find_in_pool(&pool)
2244 .allow_unsafe()
2245 .delete_all()
2246 .await
2247 .unwrap();
2248 assert_eq!(deleted, 1);
2249 }
2250
2251 #[tokio::test]
2252 async fn query_builder_update_rollback_does_not_persist() {
2253 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2254 sqlx::query(
2255 "CREATE TABLE db_users (id INTEGER PRIMARY KEY, status TEXT, deleted_at TEXT);",
2256 )
2257 .execute(&pool)
2258 .await
2259 .unwrap();
2260 sqlx::query("INSERT INTO db_users (status) VALUES ('inactive');")
2261 .execute(&pool)
2262 .await
2263 .unwrap();
2264
2265 let mut tx = pool.begin().await.unwrap();
2266 let updated = DbModel::find_in_tx(&mut tx)
2267 .filter_eq("status", "inactive")
2268 .update(serde_json::json!({ "status": "active" }))
2269 .await
2270 .unwrap();
2271 assert_eq!(updated, 1);
2272 tx.rollback().await.unwrap();
2273
2274 let count: i64 =
2275 sqlx::query_scalar("SELECT COUNT(*) FROM db_users WHERE status = 'active'")
2276 .fetch_one(&pool)
2277 .await
2278 .unwrap();
2279 assert_eq!(count, 0);
2280 }
2281
2282 #[tokio::test]
2283 async fn test_utils_with_test_transaction_rolls_back() {
2284 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2285 sqlx::query("CREATE TABLE db_tx (id INTEGER PRIMARY KEY, name TEXT);")
2286 .execute(&pool)
2287 .await
2288 .unwrap();
2289
2290 with_test_transaction(&pool, |conn| {
2291 Box::pin(async move {
2292 sqlx::query("INSERT INTO db_tx (name) VALUES ('alpha');")
2293 .execute(conn)
2294 .await?;
2295 Ok(())
2296 })
2297 })
2298 .await
2299 .unwrap();
2300
2301 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM db_tx")
2302 .fetch_one(&pool)
2303 .await
2304 .unwrap();
2305 assert_eq!(count, 0);
2306 }
2307
2308 #[cfg(feature = "sqlite")]
2309 #[tokio::test]
2310 async fn test_utils_mock_database_sqlite_works() {
2311 let mock = MockDatabase::new_sqlite().await.unwrap();
2312 sqlx::query("CREATE TABLE db_mock (id INTEGER PRIMARY KEY);")
2313 .execute(mock.pool())
2314 .await
2315 .unwrap();
2316 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM db_mock")
2317 .fetch_one(mock.pool())
2318 .await
2319 .unwrap();
2320 assert_eq!(count, 0);
2321 }
2322
2323 #[tokio::test]
2324 async fn default_model_hooks_are_noops() {
2325 let mut model = SoftDeleteModel {
2326 id: 1,
2327 status: "active".to_string(),
2328 deleted_at: None,
2329 };
2330 model.before_save().await.unwrap();
2331 model.after_save().await.unwrap();
2332 }
2333
2334 #[test]
2335 fn default_model_validation_is_ok() {
2336 let model = SoftDeleteModel {
2337 id: 1,
2338 status: "active".to_string(),
2339 deleted_at: None,
2340 };
2341 assert!(model.validate().is_ok());
2342 }
2343
2344 #[tokio::test]
2345 async fn eager_load_default_is_ok() {
2346 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2347 let mut models = vec![SoftDeleteModel {
2348 id: 1,
2349 status: "active".to_string(),
2350 deleted_at: None,
2351 }];
2352 SoftDeleteModel::eager_load(&mut models, "posts", Executor::Pool(&pool))
2353 .await
2354 .unwrap();
2355 }
2356
2357 #[tokio::test]
2358 async fn premix_sync_creates_table() {
2359 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2360 Premix::sync::<Sqlite, SyncModel>(&pool).await.unwrap();
2361
2362 let name: Option<String> = sqlx::query_scalar(
2363 "SELECT name FROM sqlite_master WHERE type='table' AND name='sync_items'",
2364 )
2365 .fetch_optional(&pool)
2366 .await
2367 .unwrap();
2368 assert_eq!(name.as_deref(), Some("sync_items"));
2369 }
2370
2371 #[tokio::test]
2372 async fn model_stub_methods_are_noops() {
2373 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2374
2375 let mut db = DbModel {
2376 id: 1,
2377 status: "active".to_string(),
2378 deleted_at: None,
2379 };
2380 db.save(&pool).await.unwrap();
2381 assert_eq!(
2382 db.update(&pool).await.unwrap(),
2383 UpdateResult::NotImplemented
2384 );
2385 db.delete(&pool).await.unwrap();
2386 assert!(DbModel::find_by_id(&pool, 1).await.unwrap().is_none());
2387
2388 let mut hard = DbHardModel {
2389 id: 2,
2390 status: "inactive".to_string(),
2391 };
2392 hard.save(&pool).await.unwrap();
2393 assert_eq!(
2394 hard.update(&pool).await.unwrap(),
2395 UpdateResult::NotImplemented
2396 );
2397 hard.delete(&pool).await.unwrap();
2398 assert!(DbHardModel::find_by_id(&pool, 2).await.unwrap().is_none());
2399
2400 let mut soft = SoftDeleteModel {
2401 id: 3,
2402 status: "active".to_string(),
2403 deleted_at: None,
2404 };
2405 soft.save(&pool).await.unwrap();
2406 assert_eq!(
2407 soft.update(&pool).await.unwrap(),
2408 UpdateResult::NotImplemented
2409 );
2410 soft.delete(&pool).await.unwrap();
2411 assert!(
2412 SoftDeleteModel::find_by_id(&pool, 3)
2413 .await
2414 .unwrap()
2415 .is_none()
2416 );
2417
2418 let mut hard_only = HardDeleteModel { id: 4 };
2419 hard_only.save(&pool).await.unwrap();
2420 assert_eq!(
2421 hard_only.update(&pool).await.unwrap(),
2422 UpdateResult::NotImplemented
2423 );
2424 hard_only.delete(&pool).await.unwrap();
2425 assert!(
2426 HardDeleteModel::find_by_id(&pool, 4)
2427 .await
2428 .unwrap()
2429 .is_none()
2430 );
2431
2432 let mut sync = SyncModel {
2433 id: 5,
2434 name: "sync".to_string(),
2435 };
2436 sync.save(&pool).await.unwrap();
2437 assert_eq!(
2438 sync.update(&pool).await.unwrap(),
2439 UpdateResult::NotImplemented
2440 );
2441 sync.delete(&pool).await.unwrap();
2442 assert!(SyncModel::find_by_id(&pool, 5).await.unwrap().is_none());
2443 }
2444
2445 #[cfg(feature = "postgres")]
2446 #[tokio::test]
2447 async fn postgres_dialect_and_query_builder_work() {
2448 let url = pg_url();
2449 let pool = match sqlx::PgPool::connect(&url).await {
2450 Ok(pool) => pool,
2451 Err(_) => return,
2452 };
2453 sqlx::query(&format!("DROP TABLE IF EXISTS {}", PG_TABLE))
2454 .execute(&pool)
2455 .await
2456 .unwrap();
2457 sqlx::query(&format!(
2458 "CREATE TABLE {} (id SERIAL PRIMARY KEY, name TEXT)",
2459 PG_TABLE
2460 ))
2461 .execute(&pool)
2462 .await
2463 .unwrap();
2464
2465 assert_eq!(sqlx::Postgres::placeholder(1), "$1");
2466 assert_eq!(sqlx::Postgres::auto_increment_pk(), "SERIAL PRIMARY KEY");
2467
2468 let mut conn = pool.acquire().await.unwrap();
2469 let mut executor = (&mut *conn).into_executor();
2470 let res = executor
2471 .execute(sqlx::query(&format!(
2472 "INSERT INTO {} (name) VALUES ('alpha')",
2473 PG_TABLE
2474 )))
2475 .await
2476 .unwrap();
2477 assert_eq!(<sqlx::Postgres as SqlDialect>::rows_affected(&res), 1);
2478 assert_eq!(<sqlx::Postgres as SqlDialect>::last_insert_id(&res), 0);
2479
2480 let updated = PgModel::find_in_pool(&pool)
2481 .filter_eq("name", "alpha")
2482 .update(serde_json::json!({ "name": "beta" }))
2483 .await
2484 .unwrap();
2485 assert_eq!(updated, 1);
2486
2487 let names: Vec<String> = sqlx::query_scalar(&format!("SELECT name FROM {}", PG_TABLE))
2488 .fetch_all(&pool)
2489 .await
2490 .unwrap();
2491 assert_eq!(names, vec!["beta".to_string()]);
2492
2493 let sql = PgModel::find_in_pool(&pool)
2494 .filter_eq("id", 1)
2495 .to_update_sql(&serde_json::json!({ "name": "gamma" }))
2496 .unwrap();
2497 assert!(sql.contains("name = $1"));
2498
2499 sqlx::query(&format!("DROP TABLE IF EXISTS {}", PG_TABLE))
2500 .execute(&pool)
2501 .await
2502 .unwrap();
2503 }
2504
2505 #[test]
2506 fn test_models_use_fields() {
2507 let soft = SoftDeleteModel {
2508 id: 1,
2509 status: "active".to_string(),
2510 deleted_at: None,
2511 };
2512 let hard = HardDeleteModel { id: 2 };
2513 let db = DbModel {
2514 id: 3,
2515 status: "ok".to_string(),
2516 deleted_at: Some("x".to_string()),
2517 };
2518 let db_hard = DbHardModel {
2519 id: 4,
2520 status: "ok".to_string(),
2521 };
2522 let sync = SyncModel {
2523 id: 5,
2524 name: "sync".to_string(),
2525 };
2526 assert_eq!(soft.id, 1);
2527 assert_eq!(soft.status, "active");
2528 assert!(soft.deleted_at.is_none());
2529 assert_eq!(hard.id, 2);
2530 assert_eq!(db.id, 3);
2531 assert_eq!(db.status, "ok");
2532 assert_eq!(db.deleted_at.as_deref(), Some("x"));
2533 assert_eq!(db_hard.id, 4);
2534 assert_eq!(db_hard.status, "ok");
2535 assert_eq!(sync.id, 5);
2536 assert_eq!(sync.name, "sync");
2537 }
2538
2539 #[tokio::test]
2540 async fn executor_from_pool_and_conn() {
2541 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2542 let _pool_exec: Executor<'_, Sqlite> = (&pool).into();
2543
2544 let mut conn = pool.acquire().await.unwrap();
2545 let _conn_exec: Executor<'_, Sqlite> = (&mut *conn).into();
2546 }
2547
2548 #[tokio::test]
2549 async fn executor_into_executor_identity() {
2550 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2551 let exec = Executor::Pool(&pool);
2552 let _same: Executor<'_, Sqlite> = exec.into_executor();
2553 }
2554
2555 #[tokio::test]
2556 async fn model_hooks_defaults_are_noops() {
2557 let mut dummy = HookDummy;
2558 dummy.before_save().await.unwrap();
2559 dummy.after_save().await.unwrap();
2560 }
2561
2562 #[tokio::test]
2563 async fn model_hooks_default_impls_cover_trait_body() {
2564 let mut dummy = HookDummy;
2565 ModelHooks::before_save(&mut dummy).await.unwrap();
2566 ModelHooks::after_save(&mut dummy).await.unwrap();
2567 default_model_hook_result().unwrap();
2568 }
2569
2570 #[tokio::test]
2571 async fn eager_load_default_impl_covers_trait_body() {
2572 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2573 let mut models = vec![SoftDeleteModel {
2574 id: 1,
2575 status: "active".to_string(),
2576 deleted_at: None,
2577 }];
2578 <SoftDeleteModel as Model<Sqlite>>::eager_load(
2579 &mut models,
2580 "posts",
2581 Executor::Pool(&pool),
2582 )
2583 .await
2584 .unwrap();
2585 }
2586
2587 #[tokio::test]
2588 async fn query_builder_include_uses_conn_executor() {
2589 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2590 sqlx::query(&SyncModel::create_table_sql())
2591 .execute(&pool)
2592 .await
2593 .unwrap();
2594
2595 let mut conn = pool.acquire().await.unwrap();
2596 let results = SyncModel::find(&mut *conn)
2597 .include("missing")
2598 .all()
2599 .await
2600 .unwrap();
2601 assert!(results.is_empty());
2602 }
2603
2604 #[tokio::test]
2605 async fn bulk_update_rejects_non_object() {
2606 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2607 let err = SoftDeleteModel::find_in_pool(&pool)
2608 .filter_eq("id", 1)
2609 .update(serde_json::json!("bad"))
2610 .await
2611 .unwrap_err();
2612 assert!(
2613 err.to_string()
2614 .contains("Bulk update requires a JSON object")
2615 );
2616 }
2617
2618 #[tokio::test]
2619 async fn bulk_update_rejects_unsupported_value_type() {
2620 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2621 let err = SoftDeleteModel::find_in_pool(&pool)
2622 .filter_eq("id", 1)
2623 .update(serde_json::json!({ "status": ["bad"] }))
2624 .await
2625 .unwrap_err();
2626 assert!(err.to_string().contains("Unsupported type in bulk update"));
2627 }
2628
2629 #[tokio::test]
2630 async fn bulk_update_binds_integers() {
2631 let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
2632 sqlx::query("CREATE TABLE users (id INTEGER PRIMARY KEY, age INTEGER, deleted_at TEXT)")
2633 .execute(&pool)
2634 .await
2635 .unwrap();
2636 sqlx::query("INSERT INTO users (id, age, deleted_at) VALUES (1, 10, NULL)")
2637 .execute(&pool)
2638 .await
2639 .unwrap();
2640
2641 let rows = SoftDeleteModel::find_in_pool(&pool)
2642 .filter_eq("id", 1)
2643 .update(serde_json::json!({ "age": 11 }))
2644 .await
2645 .unwrap();
2646 assert_eq!(rows, 1);
2647 }
2648}