1use std::borrow::Cow;
37use std::collections::HashSet;
38
39use serde::{Deserialize, Serialize};
40
41use crate::error::{QueryError, QueryResult};
42use crate::sql::DatabaseType;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
46pub enum TriggerTiming {
47 Before,
49 After,
51 InsteadOf,
53}
54
55impl TriggerTiming {
56 pub fn to_sql(&self) -> &'static str {
58 match self {
59 Self::Before => "BEFORE",
60 Self::After => "AFTER",
61 Self::InsteadOf => "INSTEAD OF",
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
68pub enum TriggerEvent {
69 Insert,
71 Update,
73 Delete,
75 Truncate,
77}
78
79impl TriggerEvent {
80 pub fn to_sql(&self) -> &'static str {
82 match self {
83 Self::Insert => "INSERT",
84 Self::Update => "UPDATE",
85 Self::Delete => "DELETE",
86 Self::Truncate => "TRUNCATE",
87 }
88 }
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
93pub enum TriggerLevel {
94 #[default]
96 Row,
97 Statement,
99}
100
101impl TriggerLevel {
102 pub fn to_sql(&self) -> &'static str {
104 match self {
105 Self::Row => "FOR EACH ROW",
106 Self::Statement => "FOR EACH STATEMENT",
107 }
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub struct UpdateOf {
114 pub columns: Vec<String>,
116}
117
118impl UpdateOf {
119 pub fn new(columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
121 Self {
122 columns: columns.into_iter().map(Into::into).collect(),
123 }
124 }
125
126 pub fn to_sql(&self) -> String {
128 if self.columns.is_empty() {
129 String::new()
130 } else {
131 format!(" OF {}", self.columns.join(", "))
132 }
133 }
134}
135
136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerCondition {
139 pub expression: String,
141}
142
143impl TriggerCondition {
144 pub fn new(expression: impl Into<String>) -> Self {
146 Self {
147 expression: expression.into(),
148 }
149 }
150
151 pub fn column_changed(column: &str) -> Self {
153 Self::new(format!("OLD.{} IS DISTINCT FROM NEW.{}", column, column))
154 }
155
156 pub fn new_not_null(column: &str) -> Self {
158 Self::new(format!("NEW.{} IS NOT NULL", column))
159 }
160
161 pub fn old_was_null(column: &str) -> Self {
163 Self::new(format!("OLD.{} IS NULL", column))
164 }
165
166 pub fn and(self, other: Self) -> Self {
168 Self::new(format!("({}) AND ({})", self.expression, other.expression))
169 }
170
171 pub fn or(self, other: Self) -> Self {
173 Self::new(format!("({}) OR ({})", self.expression, other.expression))
174 }
175}
176
177#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
179pub enum TriggerAction {
180 ExecuteFunction {
182 name: String,
184 args: Vec<String>,
186 },
187 InlineSql {
189 statements: Vec<String>,
191 },
192 FunctionReference {
194 name: String,
196 },
197}
198
199impl TriggerAction {
200 pub fn function(name: impl Into<String>) -> Self {
202 Self::ExecuteFunction {
203 name: name.into(),
204 args: Vec::new(),
205 }
206 }
207
208 pub fn function_with_args(
210 name: impl Into<String>,
211 args: impl IntoIterator<Item = impl Into<String>>,
212 ) -> Self {
213 Self::ExecuteFunction {
214 name: name.into(),
215 args: args.into_iter().map(Into::into).collect(),
216 }
217 }
218
219 pub fn inline_sql(statements: impl IntoIterator<Item = impl Into<String>>) -> Self {
221 Self::InlineSql {
222 statements: statements.into_iter().map(Into::into).collect(),
223 }
224 }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229pub struct Trigger {
230 pub name: String,
232 pub schema: Option<String>,
234 pub table: String,
236 pub timing: TriggerTiming,
238 pub events: HashSet<TriggerEvent>,
240 pub level: TriggerLevel,
242 pub update_of: Option<UpdateOf>,
244 pub condition: Option<TriggerCondition>,
246 pub action: TriggerAction,
248 pub enabled: bool,
250 pub comment: Option<String>,
252}
253
254impl Trigger {
255 pub fn builder(name: impl Into<String>) -> TriggerBuilder {
257 TriggerBuilder::new(name)
258 }
259
260 pub fn qualified_name(&self) -> Cow<'_, str> {
262 match &self.schema {
263 Some(schema) => Cow::Owned(format!("{}.{}", schema, self.name)),
264 None => Cow::Borrowed(&self.name),
265 }
266 }
267
268 pub fn qualified_table(&self) -> Cow<'_, str> {
270 match &self.schema {
271 Some(schema) => Cow::Owned(format!("{}.{}", schema, self.table)),
272 None => Cow::Borrowed(&self.table),
273 }
274 }
275
276 pub fn to_postgres_sql(&self) -> QueryResult<String> {
278 let mut sql = String::with_capacity(256);
279
280 sql.push_str("CREATE TRIGGER ");
281 sql.push_str(&self.name);
282 sql.push('\n');
283
284 sql.push_str(" ");
286 sql.push_str(self.timing.to_sql());
287 sql.push(' ');
288
289 let events: Vec<_> = self.events.iter().collect();
291 for (i, event) in events.iter().enumerate() {
292 if i > 0 {
293 sql.push_str(" OR ");
294 }
295 sql.push_str(event.to_sql());
296 if *event == &TriggerEvent::Update {
297 if let Some(ref update_of) = self.update_of {
298 sql.push_str(&update_of.to_sql());
299 }
300 }
301 }
302
303 sql.push_str("\n ON ");
305 sql.push_str(&self.qualified_table());
306 sql.push('\n');
307
308 sql.push_str(" ");
310 sql.push_str(self.level.to_sql());
311 sql.push('\n');
312
313 if let Some(ref condition) = self.condition {
315 sql.push_str(" WHEN (");
316 sql.push_str(&condition.expression);
317 sql.push_str(")\n");
318 }
319
320 sql.push_str(" EXECUTE ");
322 match &self.action {
323 TriggerAction::ExecuteFunction { name, args: _ }
324 | TriggerAction::FunctionReference { name } => {
325 sql.push_str("FUNCTION ");
326 sql.push_str(name);
327 sql.push('(');
328 if let TriggerAction::ExecuteFunction { args, .. } = &self.action {
329 sql.push_str(&args.join(", "));
330 }
331 sql.push(')');
332 }
333 TriggerAction::InlineSql { .. } => {
334 return Err(QueryError::unsupported(
335 "PostgreSQL triggers require a function, not inline SQL",
336 ));
337 }
338 }
339
340 sql.push(';');
341
342 Ok(sql)
343 }
344
345 pub fn to_mysql_sql(&self) -> QueryResult<String> {
347 if self.level == TriggerLevel::Statement {
349 return Err(QueryError::unsupported(
350 "MySQL does not support statement-level triggers",
351 ));
352 }
353
354 if self.timing == TriggerTiming::InsteadOf {
356 return Err(QueryError::unsupported(
357 "MySQL does not support INSTEAD OF triggers",
358 ));
359 }
360
361 if self.events.len() != 1 {
363 return Err(QueryError::unsupported(
364 "MySQL triggers can only have one triggering event. Create separate triggers for each event.",
365 ));
366 }
367
368 let event = self.events.iter().next().unwrap();
369
370 let mut sql = String::with_capacity(256);
371
372 sql.push_str("CREATE TRIGGER ");
373 sql.push_str(&self.name);
374 sql.push('\n');
375
376 sql.push_str(" ");
378 sql.push_str(self.timing.to_sql());
379 sql.push(' ');
380 sql.push_str(event.to_sql());
381 sql.push('\n');
382
383 sql.push_str(" ON `");
385 sql.push_str(&self.table);
386 sql.push_str("`\n");
387
388 sql.push_str(" FOR EACH ROW\n");
390
391 match &self.action {
393 TriggerAction::InlineSql { statements } => {
394 if statements.len() == 1 {
395 sql.push_str(" ");
396 sql.push_str(&statements[0]);
397 } else {
398 sql.push_str("BEGIN\n");
399 for stmt in statements {
400 sql.push_str(" ");
401 sql.push_str(stmt);
402 sql.push_str(";\n");
403 }
404 sql.push_str("END");
405 }
406 }
407 TriggerAction::ExecuteFunction { name, args } => {
408 sql.push_str(" CALL ");
409 sql.push_str(name);
410 sql.push('(');
411 sql.push_str(&args.join(", "));
412 sql.push(')');
413 }
414 TriggerAction::FunctionReference { name } => {
415 sql.push_str(" CALL ");
416 sql.push_str(name);
417 sql.push_str("()");
418 }
419 }
420
421 sql.push(';');
422
423 Ok(sql)
424 }
425
426 pub fn to_sqlite_sql(&self) -> QueryResult<String> {
428 if self.level == TriggerLevel::Statement {
430 return Err(QueryError::unsupported(
431 "SQLite does not support statement-level triggers",
432 ));
433 }
434
435 let mut sql = String::with_capacity(256);
436
437 sql.push_str("CREATE TRIGGER ");
438 if self.schema.is_some() {
439 return Err(QueryError::unsupported(
440 "SQLite does not support schema-qualified trigger names",
441 ));
442 }
443 sql.push_str(&self.name);
444 sql.push('\n');
445
446 sql.push_str(" ");
448 sql.push_str(self.timing.to_sql());
449 sql.push(' ');
450
451 if self.events.len() != 1 {
453 return Err(QueryError::unsupported(
454 "SQLite triggers can only have one triggering event",
455 ));
456 }
457
458 let event = self.events.iter().next().unwrap();
459 sql.push_str(event.to_sql());
460
461 if *event == TriggerEvent::Update {
462 if let Some(ref update_of) = self.update_of {
463 sql.push_str(&update_of.to_sql());
464 }
465 }
466
467 sql.push_str("\n ON `");
469 sql.push_str(&self.table);
470 sql.push_str("`\n");
471
472 sql.push_str(" FOR EACH ROW\n");
474
475 if let Some(ref condition) = self.condition {
477 sql.push_str(" WHEN ");
478 sql.push_str(&condition.expression);
479 sql.push('\n');
480 }
481
482 sql.push_str("BEGIN\n");
484 match &self.action {
485 TriggerAction::InlineSql { statements } => {
486 for stmt in statements {
487 sql.push_str(" ");
488 sql.push_str(stmt);
489 sql.push_str(";\n");
490 }
491 }
492 TriggerAction::ExecuteFunction { .. } | TriggerAction::FunctionReference { .. } => {
493 return Err(QueryError::unsupported(
494 "SQLite triggers require inline SQL, not function calls",
495 ));
496 }
497 }
498 sql.push_str("END;");
499
500 Ok(sql)
501 }
502
503 pub fn to_mssql_sql(&self) -> QueryResult<String> {
505 if self.timing == TriggerTiming::Before {
507 return Err(QueryError::unsupported(
508 "SQL Server does not support BEFORE triggers. Use INSTEAD OF or AFTER triggers.",
509 ));
510 }
511
512 let mut sql = String::with_capacity(256);
513
514 sql.push_str("CREATE TRIGGER ");
515 sql.push_str(&self.qualified_name());
516 sql.push('\n');
517
518 sql.push_str("ON ");
520 sql.push_str(&self.qualified_table());
521 sql.push('\n');
522
523 sql.push_str(self.timing.to_sql());
525 sql.push(' ');
526
527 let events: Vec<_> = self.events.iter().collect();
529 for (i, event) in events.iter().enumerate() {
530 if i > 0 {
531 sql.push_str(", ");
532 }
533 sql.push_str(event.to_sql());
534 }
535 sql.push('\n');
536
537 sql.push_str("AS\n");
539 sql.push_str("BEGIN\n");
540 sql.push_str(" SET NOCOUNT ON;\n");
541
542 match &self.action {
544 TriggerAction::InlineSql { statements } => {
545 for stmt in statements {
546 sql.push_str(" ");
547 sql.push_str(stmt);
548 sql.push_str(";\n");
549 }
550 }
551 TriggerAction::ExecuteFunction { name, args } => {
552 sql.push_str(" EXEC ");
553 sql.push_str(name);
554 if !args.is_empty() {
555 sql.push(' ');
556 sql.push_str(&args.join(", "));
557 }
558 sql.push_str(";\n");
559 }
560 TriggerAction::FunctionReference { name } => {
561 sql.push_str(" EXEC ");
562 sql.push_str(name);
563 sql.push_str(";\n");
564 }
565 }
566
567 sql.push_str("END;");
568
569 Ok(sql)
570 }
571
572 pub fn to_sql(&self, db_type: DatabaseType) -> QueryResult<String> {
574 match db_type {
575 DatabaseType::PostgreSQL => self.to_postgres_sql(),
576 DatabaseType::MySQL => self.to_mysql_sql(),
577 DatabaseType::SQLite => self.to_sqlite_sql(),
578 DatabaseType::MSSQL => self.to_mssql_sql(),
579 }
580 }
581
582 pub fn drop_sql(&self, db_type: DatabaseType) -> String {
584 match db_type {
585 DatabaseType::PostgreSQL => {
586 format!(
587 "DROP TRIGGER IF EXISTS {} ON {};",
588 self.name,
589 self.qualified_table()
590 )
591 }
592 DatabaseType::MySQL => {
593 format!("DROP TRIGGER IF EXISTS {};", self.name)
594 }
595 DatabaseType::SQLite => {
596 format!("DROP TRIGGER IF EXISTS {};", self.name)
597 }
598 DatabaseType::MSSQL => {
599 format!("DROP TRIGGER IF EXISTS {};", self.qualified_name())
600 }
601 }
602 }
603}
604
605#[derive(Debug, Clone)]
607pub struct TriggerBuilder {
608 name: String,
609 schema: Option<String>,
610 table: Option<String>,
611 timing: TriggerTiming,
612 events: HashSet<TriggerEvent>,
613 level: TriggerLevel,
614 update_of: Option<UpdateOf>,
615 condition: Option<TriggerCondition>,
616 action: Option<TriggerAction>,
617 enabled: bool,
618 comment: Option<String>,
619}
620
621impl TriggerBuilder {
622 pub fn new(name: impl Into<String>) -> Self {
624 Self {
625 name: name.into(),
626 schema: None,
627 table: None,
628 timing: TriggerTiming::After,
629 events: HashSet::new(),
630 level: TriggerLevel::Row,
631 update_of: None,
632 condition: None,
633 action: None,
634 enabled: true,
635 comment: None,
636 }
637 }
638
639 pub fn schema(mut self, schema: impl Into<String>) -> Self {
641 self.schema = Some(schema.into());
642 self
643 }
644
645 pub fn on_table(mut self, table: impl Into<String>) -> Self {
647 self.table = Some(table.into());
648 self
649 }
650
651 pub fn on_view(self, view: impl Into<String>) -> Self {
653 self.on_table(view)
654 }
655
656 pub fn timing(mut self, timing: TriggerTiming) -> Self {
658 self.timing = timing;
659 self
660 }
661
662 pub fn before(self) -> Self {
664 self.timing(TriggerTiming::Before)
665 }
666
667 pub fn after(self) -> Self {
669 self.timing(TriggerTiming::After)
670 }
671
672 pub fn instead_of(self) -> Self {
674 self.timing(TriggerTiming::InsteadOf)
675 }
676
677 pub fn event(mut self, event: TriggerEvent) -> Self {
679 self.events.insert(event);
680 self
681 }
682
683 pub fn events(mut self, events: impl IntoIterator<Item = TriggerEvent>) -> Self {
685 self.events.extend(events);
686 self
687 }
688
689 pub fn on_insert(self) -> Self {
691 self.event(TriggerEvent::Insert)
692 }
693
694 pub fn on_update(self) -> Self {
696 self.event(TriggerEvent::Update)
697 }
698
699 pub fn on_delete(self) -> Self {
701 self.event(TriggerEvent::Delete)
702 }
703
704 pub fn on_truncate(self) -> Self {
706 self.event(TriggerEvent::Truncate)
707 }
708
709 pub fn level(mut self, level: TriggerLevel) -> Self {
711 self.level = level;
712 self
713 }
714
715 pub fn for_each_row(self) -> Self {
717 self.level(TriggerLevel::Row)
718 }
719
720 pub fn for_each_statement(self) -> Self {
722 self.level(TriggerLevel::Statement)
723 }
724
725 pub fn update_of(mut self, columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
727 self.update_of = Some(UpdateOf::new(columns));
728 self
729 }
730
731 pub fn when(mut self, condition: TriggerCondition) -> Self {
733 self.condition = Some(condition);
734 self
735 }
736
737 pub fn when_expr(self, expression: impl Into<String>) -> Self {
739 self.when(TriggerCondition::new(expression))
740 }
741
742 pub fn execute_function(mut self, name: impl Into<String>) -> Self {
744 self.action = Some(TriggerAction::function(name));
745 self
746 }
747
748 pub fn execute_function_with_args(
750 mut self,
751 name: impl Into<String>,
752 args: impl IntoIterator<Item = impl Into<String>>,
753 ) -> Self {
754 self.action = Some(TriggerAction::function_with_args(name, args));
755 self
756 }
757
758 pub fn execute_sql(mut self, statements: impl IntoIterator<Item = impl Into<String>>) -> Self {
760 self.action = Some(TriggerAction::inline_sql(statements));
761 self
762 }
763
764 pub fn enabled(mut self, enabled: bool) -> Self {
766 self.enabled = enabled;
767 self
768 }
769
770 pub fn comment(mut self, comment: impl Into<String>) -> Self {
772 self.comment = Some(comment.into());
773 self
774 }
775
776 pub fn build(self) -> QueryResult<Trigger> {
778 let table = self.table.ok_or_else(|| {
779 QueryError::invalid_input("table", "Trigger must specify a table with on_table()")
780 })?;
781
782 if self.events.is_empty() {
783 return Err(QueryError::invalid_input(
784 "events",
785 "Trigger must have at least one event (on_insert, on_update, on_delete)",
786 ));
787 }
788
789 let action = self.action.ok_or_else(|| {
790 QueryError::invalid_input(
791 "action",
792 "Trigger must have an action (execute_function or execute_sql)",
793 )
794 })?;
795
796 Ok(Trigger {
797 name: self.name,
798 schema: self.schema,
799 table,
800 timing: self.timing,
801 events: self.events,
802 level: self.level,
803 update_of: self.update_of,
804 condition: self.condition,
805 action,
806 enabled: self.enabled,
807 comment: self.comment,
808 })
809 }
810}
811
812pub mod patterns {
814 use super::*;
815
816 pub fn audit_trigger(
824 table: &str,
825 audit_table: &str,
826 events: impl IntoIterator<Item = TriggerEvent>,
827 ) -> TriggerBuilder {
828 let _ = audit_table; Trigger::builder(format!("{}_audit_trigger", table))
833 .on_table(table)
834 .after()
835 .events(events)
836 .for_each_row()
837 .execute_function("audit_trigger_func")
838 }
839
840 pub fn soft_delete_trigger(table: &str, deleted_at_column: &str) -> TriggerBuilder {
842 Trigger::builder(format!("{}_soft_delete", table))
843 .on_table(table)
844 .instead_of()
845 .on_delete()
846 .for_each_row()
847 .execute_sql([format!(
848 "UPDATE {} SET {} = NOW() WHERE id = OLD.id",
849 table, deleted_at_column
850 )])
851 }
852
853 pub fn updated_at_trigger(table: &str, column: &str) -> TriggerBuilder {
855 Trigger::builder(format!("{}_updated_at", table))
856 .on_table(table)
857 .before()
858 .on_update()
859 .for_each_row()
860 .execute_sql([format!("NEW.{} = NOW()", column)])
861 }
862
863 pub fn validation_trigger(
865 table: &str,
866 name: &str,
867 condition: &str,
868 error_message: &str,
869 ) -> TriggerBuilder {
870 Trigger::builder(name)
871 .on_table(table)
872 .before()
873 .on_insert()
874 .on_update()
875 .for_each_row()
876 .when_expr(condition)
877 .execute_sql([format!("RAISE EXCEPTION '{}'", error_message)])
878 }
879}
880
881pub mod mongodb {
883 use super::*;
884
885 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
887 pub enum ChangeType {
888 Insert,
890 Update,
892 Replace,
894 Delete,
896 Drop,
898 Rename,
900 DropDatabase,
902 Invalidate,
904 }
905
906 impl ChangeType {
907 pub fn as_str(&self) -> &'static str {
909 match self {
910 Self::Insert => "insert",
911 Self::Update => "update",
912 Self::Replace => "replace",
913 Self::Delete => "delete",
914 Self::Drop => "drop",
915 Self::Rename => "rename",
916 Self::DropDatabase => "dropDatabase",
917 Self::Invalidate => "invalidate",
918 }
919 }
920 }
921
922 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
924 pub struct ChangeStreamOptions {
925 pub resume_after: Option<String>,
927 pub start_at_operation_time: Option<String>,
929 pub full_document: FullDocument,
931 pub full_document_before_change: FullDocumentBeforeChange,
933 pub max_await_time_ms: Option<u64>,
935 pub batch_size: Option<u32>,
937 }
938
939 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
941 pub enum FullDocument {
942 #[default]
944 Default,
945 UpdateLookup,
947 WhenAvailable,
949 Required,
951 }
952
953 impl FullDocument {
954 pub fn as_str(&self) -> &'static str {
956 match self {
957 Self::Default => "default",
958 Self::UpdateLookup => "updateLookup",
959 Self::WhenAvailable => "whenAvailable",
960 Self::Required => "required",
961 }
962 }
963 }
964
965 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
967 pub enum FullDocumentBeforeChange {
968 #[default]
970 Off,
971 WhenAvailable,
973 Required,
975 }
976
977 impl FullDocumentBeforeChange {
978 pub fn as_str(&self) -> &'static str {
980 match self {
981 Self::Off => "off",
982 Self::WhenAvailable => "whenAvailable",
983 Self::Required => "required",
984 }
985 }
986 }
987
988 #[derive(Debug, Clone, Serialize, Deserialize)]
990 pub struct ChangeStreamPipeline {
991 pub stages: Vec<PipelineStage>,
993 }
994
995 impl ChangeStreamPipeline {
996 pub fn new() -> Self {
998 Self { stages: Vec::new() }
999 }
1000
1001 pub fn match_stage(mut self, filter: serde_json::Value) -> Self {
1003 self.stages.push(PipelineStage::Match(filter));
1004 self
1005 }
1006
1007 pub fn operation_types(self, types: &[ChangeType]) -> Self {
1009 let type_strs: Vec<_> = types.iter().map(|t| t.as_str()).collect();
1010 self.match_stage(serde_json::json!({
1011 "operationType": { "$in": type_strs }
1012 }))
1013 }
1014
1015 pub fn namespace(self, db: &str, collection: &str) -> Self {
1017 self.match_stage(serde_json::json!({
1018 "ns": { "db": db, "coll": collection }
1019 }))
1020 }
1021
1022 pub fn project(mut self, projection: serde_json::Value) -> Self {
1024 self.stages.push(PipelineStage::Project(projection));
1025 self
1026 }
1027 }
1028
1029 impl Default for ChangeStreamPipeline {
1030 fn default() -> Self {
1031 Self::new()
1032 }
1033 }
1034
1035 #[derive(Debug, Clone, Serialize, Deserialize)]
1037 pub enum PipelineStage {
1038 Match(serde_json::Value),
1040 Project(serde_json::Value),
1042 AddFields(serde_json::Value),
1044 ReplaceRoot(serde_json::Value),
1046 Redact(serde_json::Value),
1048 }
1049
1050 #[derive(Debug, Clone, Default)]
1052 pub struct ChangeStreamBuilder {
1053 collection: Option<String>,
1054 database: Option<String>,
1055 pipeline: ChangeStreamPipeline,
1056 options: ChangeStreamOptions,
1057 }
1058
1059 impl ChangeStreamBuilder {
1060 pub fn new() -> Self {
1062 Self::default()
1063 }
1064
1065 pub fn collection(mut self, name: impl Into<String>) -> Self {
1067 self.collection = Some(name.into());
1068 self
1069 }
1070
1071 pub fn database(mut self, name: impl Into<String>) -> Self {
1073 self.database = Some(name.into());
1074 self
1075 }
1076
1077 pub fn operations(mut self, types: &[ChangeType]) -> Self {
1079 self.pipeline = self.pipeline.operation_types(types);
1080 self
1081 }
1082
1083 pub fn filter(mut self, filter: serde_json::Value) -> Self {
1085 self.pipeline = self.pipeline.match_stage(filter);
1086 self
1087 }
1088
1089 pub fn full_document(mut self, policy: FullDocument) -> Self {
1091 self.options.full_document = policy;
1092 self
1093 }
1094
1095 pub fn full_document_before_change(mut self, policy: FullDocumentBeforeChange) -> Self {
1097 self.options.full_document_before_change = policy;
1098 self
1099 }
1100
1101 pub fn resume_after(mut self, token: impl Into<String>) -> Self {
1103 self.options.resume_after = Some(token.into());
1104 self
1105 }
1106
1107 pub fn max_await_time_ms(mut self, ms: u64) -> Self {
1109 self.options.max_await_time_ms = Some(ms);
1110 self
1111 }
1112
1113 pub fn batch_size(mut self, size: u32) -> Self {
1115 self.options.batch_size = Some(size);
1116 self
1117 }
1118
1119 pub fn build_pipeline(&self) -> &[PipelineStage] {
1121 &self.pipeline.stages
1122 }
1123
1124 pub fn build_options(&self) -> &ChangeStreamOptions {
1126 &self.options
1127 }
1128 }
1129}
1130
1131#[cfg(test)]
1132mod tests {
1133 use super::*;
1134
1135 #[test]
1136 fn test_trigger_builder() {
1137 let trigger = Trigger::builder("audit_users")
1138 .on_table("users")
1139 .after()
1140 .on_update()
1141 .on_delete()
1142 .for_each_row()
1143 .execute_function("audit_log_changes")
1144 .build()
1145 .unwrap();
1146
1147 assert_eq!(trigger.name, "audit_users");
1148 assert_eq!(trigger.table, "users");
1149 assert_eq!(trigger.timing, TriggerTiming::After);
1150 assert!(trigger.events.contains(&TriggerEvent::Update));
1151 assert!(trigger.events.contains(&TriggerEvent::Delete));
1152 assert_eq!(trigger.level, TriggerLevel::Row);
1153 }
1154
1155 #[test]
1156 fn test_postgres_trigger_sql() {
1157 let trigger = Trigger::builder("audit_users")
1158 .on_table("users")
1159 .after()
1160 .on_insert()
1161 .on_update()
1162 .for_each_row()
1163 .execute_function("audit_func")
1164 .build()
1165 .unwrap();
1166
1167 let sql = trigger.to_postgres_sql().unwrap();
1168 assert!(sql.contains("CREATE TRIGGER audit_users"));
1169 assert!(sql.contains("AFTER"));
1170 assert!(sql.contains("ON users"));
1171 assert!(sql.contains("FOR EACH ROW"));
1172 assert!(sql.contains("EXECUTE FUNCTION audit_func()"));
1173 }
1174
1175 #[test]
1176 fn test_mysql_trigger_sql() {
1177 let trigger = Trigger::builder("audit_users")
1178 .on_table("users")
1179 .after()
1180 .on_insert()
1181 .for_each_row()
1182 .execute_sql(["INSERT INTO audit_log VALUES (NEW.id, 'INSERT')"])
1183 .build()
1184 .unwrap();
1185
1186 let sql = trigger.to_mysql_sql().unwrap();
1187 assert!(sql.contains("CREATE TRIGGER audit_users"));
1188 assert!(sql.contains("AFTER INSERT"));
1189 assert!(sql.contains("ON `users`"));
1190 assert!(sql.contains("FOR EACH ROW"));
1191 }
1192
1193 #[test]
1194 fn test_mysql_multiple_events_error() {
1195 let trigger = Trigger::builder("audit_users")
1196 .on_table("users")
1197 .after()
1198 .on_insert()
1199 .on_update()
1200 .execute_sql(["SELECT 1"])
1201 .build()
1202 .unwrap();
1203
1204 let result = trigger.to_mysql_sql();
1205 assert!(result.is_err());
1206 }
1207
1208 #[test]
1209 fn test_sqlite_trigger_sql() {
1210 let trigger = Trigger::builder("audit_users")
1211 .on_table("users")
1212 .after()
1213 .on_delete()
1214 .for_each_row()
1215 .when_expr("OLD.important = 1")
1216 .execute_sql(["INSERT INTO deleted_users SELECT * FROM OLD"])
1217 .build()
1218 .unwrap();
1219
1220 let sql = trigger.to_sqlite_sql().unwrap();
1221 assert!(sql.contains("CREATE TRIGGER audit_users"));
1222 assert!(sql.contains("AFTER DELETE"));
1223 assert!(sql.contains("ON `users`"));
1224 assert!(sql.contains("WHEN OLD.important = 1"));
1225 assert!(sql.contains("BEGIN"));
1226 assert!(sql.contains("END;"));
1227 }
1228
1229 #[test]
1230 fn test_mssql_trigger_sql() {
1231 let trigger = Trigger::builder("audit_users")
1232 .schema("dbo")
1233 .on_table("users")
1234 .after()
1235 .on_insert()
1236 .on_update()
1237 .execute_sql(["INSERT INTO audit_log SELECT * FROM inserted"])
1238 .build()
1239 .unwrap();
1240
1241 let sql = trigger.to_mssql_sql().unwrap();
1242 assert!(sql.contains("CREATE TRIGGER dbo.audit_users"));
1243 assert!(sql.contains("ON dbo.users"));
1244 assert!(sql.contains("AFTER INSERT, UPDATE") || sql.contains("AFTER UPDATE, INSERT"));
1245 assert!(sql.contains("SET NOCOUNT ON"));
1246 }
1247
1248 #[test]
1249 fn test_mssql_before_error() {
1250 let trigger = Trigger::builder("audit_users")
1251 .on_table("users")
1252 .before()
1253 .on_insert()
1254 .execute_sql(["SELECT 1"])
1255 .build()
1256 .unwrap();
1257
1258 let result = trigger.to_mssql_sql();
1259 assert!(result.is_err());
1260 }
1261
1262 #[test]
1263 fn test_drop_trigger_sql() {
1264 let trigger = Trigger::builder("audit_users")
1265 .on_table("users")
1266 .after()
1267 .on_insert()
1268 .execute_function("audit_func")
1269 .build()
1270 .unwrap();
1271
1272 let pg_drop = trigger.drop_sql(DatabaseType::PostgreSQL);
1273 assert_eq!(pg_drop, "DROP TRIGGER IF EXISTS audit_users ON users;");
1274
1275 let mysql_drop = trigger.drop_sql(DatabaseType::MySQL);
1276 assert_eq!(mysql_drop, "DROP TRIGGER IF EXISTS audit_users;");
1277 }
1278
1279 #[test]
1280 fn test_trigger_condition() {
1281 let cond = TriggerCondition::column_changed("email")
1282 .and(TriggerCondition::new_not_null("verified"));
1283
1284 assert!(
1285 cond.expression
1286 .contains("OLD.email IS DISTINCT FROM NEW.email")
1287 );
1288 assert!(cond.expression.contains("NEW.verified IS NOT NULL"));
1289 }
1290
1291 #[test]
1292 fn test_update_of() {
1293 let update_of = UpdateOf::new(["email", "password"]);
1294 assert_eq!(update_of.to_sql(), " OF email, password");
1295 }
1296
1297 #[test]
1298 fn test_trigger_with_update_of() {
1299 let trigger = Trigger::builder("sensitive_update")
1300 .on_table("users")
1301 .before()
1302 .on_update()
1303 .update_of(["email", "password"])
1304 .execute_function("validate_sensitive_update")
1305 .build()
1306 .unwrap();
1307
1308 let sql = trigger.to_postgres_sql().unwrap();
1309 assert!(sql.contains("UPDATE OF email, password"));
1310 }
1311
1312 #[test]
1313 fn test_instead_of_trigger() {
1314 let trigger = Trigger::builder("view_insert")
1315 .on_view("user_view")
1316 .instead_of()
1317 .on_insert()
1318 .execute_function("handle_view_insert")
1319 .build()
1320 .unwrap();
1321
1322 let sql = trigger.to_postgres_sql().unwrap();
1323 assert!(sql.contains("INSTEAD OF INSERT"));
1324 }
1325
1326 #[test]
1327 fn test_missing_table_error() {
1328 let result = Trigger::builder("test")
1329 .on_insert()
1330 .execute_function("func")
1331 .build();
1332
1333 assert!(result.is_err());
1334 }
1335
1336 #[test]
1337 fn test_missing_events_error() {
1338 let result = Trigger::builder("test")
1339 .on_table("users")
1340 .execute_function("func")
1341 .build();
1342
1343 assert!(result.is_err());
1344 }
1345
1346 #[test]
1347 fn test_missing_action_error() {
1348 let result = Trigger::builder("test")
1349 .on_table("users")
1350 .on_insert()
1351 .build();
1352
1353 assert!(result.is_err());
1354 }
1355
1356 mod mongodb_tests {
1357 use super::super::mongodb::*;
1358
1359 #[test]
1360 fn test_change_stream_builder() {
1361 let builder = ChangeStreamBuilder::new()
1362 .collection("users")
1363 .operations(&[ChangeType::Insert, ChangeType::Update])
1364 .full_document(FullDocument::UpdateLookup)
1365 .batch_size(100);
1366
1367 assert_eq!(
1368 builder.build_options().full_document,
1369 FullDocument::UpdateLookup
1370 );
1371 assert_eq!(builder.build_options().batch_size, Some(100));
1372 }
1373
1374 #[test]
1375 fn test_change_type() {
1376 assert_eq!(ChangeType::Insert.as_str(), "insert");
1377 assert_eq!(ChangeType::Update.as_str(), "update");
1378 assert_eq!(ChangeType::Delete.as_str(), "delete");
1379 }
1380
1381 #[test]
1382 fn test_full_document_options() {
1383 assert_eq!(FullDocument::Default.as_str(), "default");
1384 assert_eq!(FullDocument::UpdateLookup.as_str(), "updateLookup");
1385 assert_eq!(FullDocumentBeforeChange::Required.as_str(), "required");
1386 }
1387 }
1388}