1use std::borrow::Cow;
37use std::collections::HashSet;
38
39use serde::{Deserialize, Serialize};
40
41use crate::error::{QueryError, QueryResult};
42use crate::sql::DatabaseType;
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
46pub enum TriggerTiming {
47 Before,
49 After,
51 InsteadOf,
53}
54
55impl TriggerTiming {
56 pub fn to_sql(&self) -> &'static str {
58 match self {
59 Self::Before => "BEFORE",
60 Self::After => "AFTER",
61 Self::InsteadOf => "INSTEAD OF",
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
68pub enum TriggerEvent {
69 Insert,
71 Update,
73 Delete,
75 Truncate,
77}
78
79impl TriggerEvent {
80 pub fn to_sql(&self) -> &'static str {
82 match self {
83 Self::Insert => "INSERT",
84 Self::Update => "UPDATE",
85 Self::Delete => "DELETE",
86 Self::Truncate => "TRUNCATE",
87 }
88 }
89}
90
91#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
93pub enum TriggerLevel {
94 #[default]
96 Row,
97 Statement,
99}
100
101impl TriggerLevel {
102 pub fn to_sql(&self) -> &'static str {
104 match self {
105 Self::Row => "FOR EACH ROW",
106 Self::Statement => "FOR EACH STATEMENT",
107 }
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
113pub struct UpdateOf {
114 pub columns: Vec<String>,
116}
117
118impl UpdateOf {
119 pub fn new(columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
121 Self {
122 columns: columns.into_iter().map(Into::into).collect(),
123 }
124 }
125
126 pub fn to_sql(&self) -> String {
128 if self.columns.is_empty() {
129 String::new()
130 } else {
131 format!(" OF {}", self.columns.join(", "))
132 }
133 }
134}
135
136#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
138pub struct TriggerCondition {
139 pub expression: String,
141}
142
143impl TriggerCondition {
144 pub fn new(expression: impl Into<String>) -> Self {
146 Self {
147 expression: expression.into(),
148 }
149 }
150
151 pub fn column_changed(column: &str) -> Self {
153 Self::new(format!("OLD.{} IS DISTINCT FROM NEW.{}", column, column))
154 }
155
156 pub fn new_not_null(column: &str) -> Self {
158 Self::new(format!("NEW.{} IS NOT NULL", column))
159 }
160
161 pub fn old_was_null(column: &str) -> Self {
163 Self::new(format!("OLD.{} IS NULL", column))
164 }
165
166 pub fn and(self, other: Self) -> Self {
168 Self::new(format!("({}) AND ({})", self.expression, other.expression))
169 }
170
171 pub fn or(self, other: Self) -> Self {
173 Self::new(format!("({}) OR ({})", self.expression, other.expression))
174 }
175}
176
177#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
179pub enum TriggerAction {
180 ExecuteFunction {
182 name: String,
184 args: Vec<String>,
186 },
187 InlineSql {
189 statements: Vec<String>,
191 },
192 FunctionReference {
194 name: String,
196 },
197}
198
199impl TriggerAction {
200 pub fn function(name: impl Into<String>) -> Self {
202 Self::ExecuteFunction {
203 name: name.into(),
204 args: Vec::new(),
205 }
206 }
207
208 pub fn function_with_args(
210 name: impl Into<String>,
211 args: impl IntoIterator<Item = impl Into<String>>,
212 ) -> Self {
213 Self::ExecuteFunction {
214 name: name.into(),
215 args: args.into_iter().map(Into::into).collect(),
216 }
217 }
218
219 pub fn inline_sql(statements: impl IntoIterator<Item = impl Into<String>>) -> Self {
221 Self::InlineSql {
222 statements: statements.into_iter().map(Into::into).collect(),
223 }
224 }
225}
226
227#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
229pub struct Trigger {
230 pub name: String,
232 pub schema: Option<String>,
234 pub table: String,
236 pub timing: TriggerTiming,
238 pub events: HashSet<TriggerEvent>,
240 pub level: TriggerLevel,
242 pub update_of: Option<UpdateOf>,
244 pub condition: Option<TriggerCondition>,
246 pub action: TriggerAction,
248 pub enabled: bool,
250 pub comment: Option<String>,
252}
253
254impl Trigger {
255 pub fn builder(name: impl Into<String>) -> TriggerBuilder {
257 TriggerBuilder::new(name)
258 }
259
260 pub fn qualified_name(&self) -> Cow<'_, str> {
262 match &self.schema {
263 Some(schema) => Cow::Owned(format!("{}.{}", schema, self.name)),
264 None => Cow::Borrowed(&self.name),
265 }
266 }
267
268 pub fn qualified_table(&self) -> Cow<'_, str> {
270 match &self.schema {
271 Some(schema) => Cow::Owned(format!("{}.{}", schema, self.table)),
272 None => Cow::Borrowed(&self.table),
273 }
274 }
275
276 pub fn to_postgres_sql(&self) -> QueryResult<String> {
278 let mut sql = String::with_capacity(256);
279
280 sql.push_str("CREATE TRIGGER ");
281 sql.push_str(&self.name);
282 sql.push('\n');
283
284 sql.push_str(" ");
286 sql.push_str(self.timing.to_sql());
287 sql.push(' ');
288
289 let events: Vec<_> = self.events.iter().collect();
291 for (i, event) in events.iter().enumerate() {
292 if i > 0 {
293 sql.push_str(" OR ");
294 }
295 sql.push_str(event.to_sql());
296 if *event == &TriggerEvent::Update {
297 if let Some(ref update_of) = self.update_of {
298 sql.push_str(&update_of.to_sql());
299 }
300 }
301 }
302
303 sql.push_str("\n ON ");
305 sql.push_str(&self.qualified_table());
306 sql.push('\n');
307
308 sql.push_str(" ");
310 sql.push_str(self.level.to_sql());
311 sql.push('\n');
312
313 if let Some(ref condition) = self.condition {
315 sql.push_str(" WHEN (");
316 sql.push_str(&condition.expression);
317 sql.push_str(")\n");
318 }
319
320 sql.push_str(" EXECUTE ");
322 match &self.action {
323 TriggerAction::ExecuteFunction { name, args: _ } | TriggerAction::FunctionReference { name } => {
324 sql.push_str("FUNCTION ");
325 sql.push_str(name);
326 sql.push('(');
327 if let TriggerAction::ExecuteFunction { args, .. } = &self.action {
328 sql.push_str(&args.join(", "));
329 }
330 sql.push(')');
331 }
332 TriggerAction::InlineSql { .. } => {
333 return Err(QueryError::unsupported(
334 "PostgreSQL triggers require a function, not inline SQL",
335 ));
336 }
337 }
338
339 sql.push(';');
340
341 Ok(sql)
342 }
343
344 pub fn to_mysql_sql(&self) -> QueryResult<String> {
346 if self.level == TriggerLevel::Statement {
348 return Err(QueryError::unsupported(
349 "MySQL does not support statement-level triggers",
350 ));
351 }
352
353 if self.timing == TriggerTiming::InsteadOf {
355 return Err(QueryError::unsupported(
356 "MySQL does not support INSTEAD OF triggers",
357 ));
358 }
359
360 if self.events.len() != 1 {
362 return Err(QueryError::unsupported(
363 "MySQL triggers can only have one triggering event. Create separate triggers for each event.",
364 ));
365 }
366
367 let event = self.events.iter().next().unwrap();
368
369 let mut sql = String::with_capacity(256);
370
371 sql.push_str("CREATE TRIGGER ");
372 sql.push_str(&self.name);
373 sql.push('\n');
374
375 sql.push_str(" ");
377 sql.push_str(self.timing.to_sql());
378 sql.push(' ');
379 sql.push_str(event.to_sql());
380 sql.push('\n');
381
382 sql.push_str(" ON `");
384 sql.push_str(&self.table);
385 sql.push_str("`\n");
386
387 sql.push_str(" FOR EACH ROW\n");
389
390 match &self.action {
392 TriggerAction::InlineSql { statements } => {
393 if statements.len() == 1 {
394 sql.push_str(" ");
395 sql.push_str(&statements[0]);
396 } else {
397 sql.push_str("BEGIN\n");
398 for stmt in statements {
399 sql.push_str(" ");
400 sql.push_str(stmt);
401 sql.push_str(";\n");
402 }
403 sql.push_str("END");
404 }
405 }
406 TriggerAction::ExecuteFunction { name, args } => {
407 sql.push_str(" CALL ");
408 sql.push_str(name);
409 sql.push('(');
410 sql.push_str(&args.join(", "));
411 sql.push(')');
412 }
413 TriggerAction::FunctionReference { name } => {
414 sql.push_str(" CALL ");
415 sql.push_str(name);
416 sql.push_str("()");
417 }
418 }
419
420 sql.push(';');
421
422 Ok(sql)
423 }
424
425 pub fn to_sqlite_sql(&self) -> QueryResult<String> {
427 if self.level == TriggerLevel::Statement {
429 return Err(QueryError::unsupported(
430 "SQLite does not support statement-level triggers",
431 ));
432 }
433
434 let mut sql = String::with_capacity(256);
435
436 sql.push_str("CREATE TRIGGER ");
437 if self.schema.is_some() {
438 return Err(QueryError::unsupported(
439 "SQLite does not support schema-qualified trigger names",
440 ));
441 }
442 sql.push_str(&self.name);
443 sql.push('\n');
444
445 sql.push_str(" ");
447 sql.push_str(self.timing.to_sql());
448 sql.push(' ');
449
450 if self.events.len() != 1 {
452 return Err(QueryError::unsupported(
453 "SQLite triggers can only have one triggering event",
454 ));
455 }
456
457 let event = self.events.iter().next().unwrap();
458 sql.push_str(event.to_sql());
459
460 if *event == TriggerEvent::Update {
461 if let Some(ref update_of) = self.update_of {
462 sql.push_str(&update_of.to_sql());
463 }
464 }
465
466 sql.push_str("\n ON `");
468 sql.push_str(&self.table);
469 sql.push_str("`\n");
470
471 sql.push_str(" FOR EACH ROW\n");
473
474 if let Some(ref condition) = self.condition {
476 sql.push_str(" WHEN ");
477 sql.push_str(&condition.expression);
478 sql.push('\n');
479 }
480
481 sql.push_str("BEGIN\n");
483 match &self.action {
484 TriggerAction::InlineSql { statements } => {
485 for stmt in statements {
486 sql.push_str(" ");
487 sql.push_str(stmt);
488 sql.push_str(";\n");
489 }
490 }
491 TriggerAction::ExecuteFunction { .. } | TriggerAction::FunctionReference { .. } => {
492 return Err(QueryError::unsupported(
493 "SQLite triggers require inline SQL, not function calls",
494 ));
495 }
496 }
497 sql.push_str("END;");
498
499 Ok(sql)
500 }
501
502 pub fn to_mssql_sql(&self) -> QueryResult<String> {
504 if self.timing == TriggerTiming::Before {
506 return Err(QueryError::unsupported(
507 "SQL Server does not support BEFORE triggers. Use INSTEAD OF or AFTER triggers.",
508 ));
509 }
510
511 let mut sql = String::with_capacity(256);
512
513 sql.push_str("CREATE TRIGGER ");
514 sql.push_str(&self.qualified_name());
515 sql.push('\n');
516
517 sql.push_str("ON ");
519 sql.push_str(&self.qualified_table());
520 sql.push('\n');
521
522 sql.push_str(self.timing.to_sql());
524 sql.push(' ');
525
526 let events: Vec<_> = self.events.iter().collect();
528 for (i, event) in events.iter().enumerate() {
529 if i > 0 {
530 sql.push_str(", ");
531 }
532 sql.push_str(event.to_sql());
533 }
534 sql.push('\n');
535
536 sql.push_str("AS\n");
538 sql.push_str("BEGIN\n");
539 sql.push_str(" SET NOCOUNT ON;\n");
540
541 match &self.action {
543 TriggerAction::InlineSql { statements } => {
544 for stmt in statements {
545 sql.push_str(" ");
546 sql.push_str(stmt);
547 sql.push_str(";\n");
548 }
549 }
550 TriggerAction::ExecuteFunction { name, args } => {
551 sql.push_str(" EXEC ");
552 sql.push_str(name);
553 if !args.is_empty() {
554 sql.push(' ');
555 sql.push_str(&args.join(", "));
556 }
557 sql.push_str(";\n");
558 }
559 TriggerAction::FunctionReference { name } => {
560 sql.push_str(" EXEC ");
561 sql.push_str(name);
562 sql.push_str(";\n");
563 }
564 }
565
566 sql.push_str("END;");
567
568 Ok(sql)
569 }
570
571 pub fn to_sql(&self, db_type: DatabaseType) -> QueryResult<String> {
573 match db_type {
574 DatabaseType::PostgreSQL => self.to_postgres_sql(),
575 DatabaseType::MySQL => self.to_mysql_sql(),
576 DatabaseType::SQLite => self.to_sqlite_sql(),
577 DatabaseType::MSSQL => self.to_mssql_sql(),
578 }
579 }
580
581 pub fn drop_sql(&self, db_type: DatabaseType) -> String {
583 match db_type {
584 DatabaseType::PostgreSQL => {
585 format!(
586 "DROP TRIGGER IF EXISTS {} ON {};",
587 self.name,
588 self.qualified_table()
589 )
590 }
591 DatabaseType::MySQL => {
592 format!("DROP TRIGGER IF EXISTS {};", self.name)
593 }
594 DatabaseType::SQLite => {
595 format!("DROP TRIGGER IF EXISTS {};", self.name)
596 }
597 DatabaseType::MSSQL => {
598 format!(
599 "DROP TRIGGER IF EXISTS {};",
600 self.qualified_name()
601 )
602 }
603 }
604 }
605}
606
607#[derive(Debug, Clone)]
609pub struct TriggerBuilder {
610 name: String,
611 schema: Option<String>,
612 table: Option<String>,
613 timing: TriggerTiming,
614 events: HashSet<TriggerEvent>,
615 level: TriggerLevel,
616 update_of: Option<UpdateOf>,
617 condition: Option<TriggerCondition>,
618 action: Option<TriggerAction>,
619 enabled: bool,
620 comment: Option<String>,
621}
622
623impl TriggerBuilder {
624 pub fn new(name: impl Into<String>) -> Self {
626 Self {
627 name: name.into(),
628 schema: None,
629 table: None,
630 timing: TriggerTiming::After,
631 events: HashSet::new(),
632 level: TriggerLevel::Row,
633 update_of: None,
634 condition: None,
635 action: None,
636 enabled: true,
637 comment: None,
638 }
639 }
640
641 pub fn schema(mut self, schema: impl Into<String>) -> Self {
643 self.schema = Some(schema.into());
644 self
645 }
646
647 pub fn on_table(mut self, table: impl Into<String>) -> Self {
649 self.table = Some(table.into());
650 self
651 }
652
653 pub fn on_view(self, view: impl Into<String>) -> Self {
655 self.on_table(view)
656 }
657
658 pub fn timing(mut self, timing: TriggerTiming) -> Self {
660 self.timing = timing;
661 self
662 }
663
664 pub fn before(self) -> Self {
666 self.timing(TriggerTiming::Before)
667 }
668
669 pub fn after(self) -> Self {
671 self.timing(TriggerTiming::After)
672 }
673
674 pub fn instead_of(self) -> Self {
676 self.timing(TriggerTiming::InsteadOf)
677 }
678
679 pub fn event(mut self, event: TriggerEvent) -> Self {
681 self.events.insert(event);
682 self
683 }
684
685 pub fn events(mut self, events: impl IntoIterator<Item = TriggerEvent>) -> Self {
687 self.events.extend(events);
688 self
689 }
690
691 pub fn on_insert(self) -> Self {
693 self.event(TriggerEvent::Insert)
694 }
695
696 pub fn on_update(self) -> Self {
698 self.event(TriggerEvent::Update)
699 }
700
701 pub fn on_delete(self) -> Self {
703 self.event(TriggerEvent::Delete)
704 }
705
706 pub fn on_truncate(self) -> Self {
708 self.event(TriggerEvent::Truncate)
709 }
710
711 pub fn level(mut self, level: TriggerLevel) -> Self {
713 self.level = level;
714 self
715 }
716
717 pub fn for_each_row(self) -> Self {
719 self.level(TriggerLevel::Row)
720 }
721
722 pub fn for_each_statement(self) -> Self {
724 self.level(TriggerLevel::Statement)
725 }
726
727 pub fn update_of(mut self, columns: impl IntoIterator<Item = impl Into<String>>) -> Self {
729 self.update_of = Some(UpdateOf::new(columns));
730 self
731 }
732
733 pub fn when(mut self, condition: TriggerCondition) -> Self {
735 self.condition = Some(condition);
736 self
737 }
738
739 pub fn when_expr(self, expression: impl Into<String>) -> Self {
741 self.when(TriggerCondition::new(expression))
742 }
743
744 pub fn execute_function(mut self, name: impl Into<String>) -> Self {
746 self.action = Some(TriggerAction::function(name));
747 self
748 }
749
750 pub fn execute_function_with_args(
752 mut self,
753 name: impl Into<String>,
754 args: impl IntoIterator<Item = impl Into<String>>,
755 ) -> Self {
756 self.action = Some(TriggerAction::function_with_args(name, args));
757 self
758 }
759
760 pub fn execute_sql(mut self, statements: impl IntoIterator<Item = impl Into<String>>) -> Self {
762 self.action = Some(TriggerAction::inline_sql(statements));
763 self
764 }
765
766 pub fn enabled(mut self, enabled: bool) -> Self {
768 self.enabled = enabled;
769 self
770 }
771
772 pub fn comment(mut self, comment: impl Into<String>) -> Self {
774 self.comment = Some(comment.into());
775 self
776 }
777
778 pub fn build(self) -> QueryResult<Trigger> {
780 let table = self.table.ok_or_else(|| {
781 QueryError::invalid_input("table", "Trigger must specify a table with on_table()")
782 })?;
783
784 if self.events.is_empty() {
785 return Err(QueryError::invalid_input(
786 "events",
787 "Trigger must have at least one event (on_insert, on_update, on_delete)",
788 ));
789 }
790
791 let action = self.action.ok_or_else(|| {
792 QueryError::invalid_input(
793 "action",
794 "Trigger must have an action (execute_function or execute_sql)",
795 )
796 })?;
797
798 Ok(Trigger {
799 name: self.name,
800 schema: self.schema,
801 table,
802 timing: self.timing,
803 events: self.events,
804 level: self.level,
805 update_of: self.update_of,
806 condition: self.condition,
807 action,
808 enabled: self.enabled,
809 comment: self.comment,
810 })
811 }
812}
813
814pub mod patterns {
816 use super::*;
817
818 pub fn audit_trigger(
826 table: &str,
827 audit_table: &str,
828 events: impl IntoIterator<Item = TriggerEvent>,
829 ) -> TriggerBuilder {
830 let _ = audit_table; Trigger::builder(format!("{}_audit_trigger", table))
835 .on_table(table)
836 .after()
837 .events(events)
838 .for_each_row()
839 .execute_function("audit_trigger_func")
840 }
841
842 pub fn soft_delete_trigger(table: &str, deleted_at_column: &str) -> TriggerBuilder {
844 Trigger::builder(format!("{}_soft_delete", table))
845 .on_table(table)
846 .instead_of()
847 .on_delete()
848 .for_each_row()
849 .execute_sql([format!(
850 "UPDATE {} SET {} = NOW() WHERE id = OLD.id",
851 table, deleted_at_column
852 )])
853 }
854
855 pub fn updated_at_trigger(table: &str, column: &str) -> TriggerBuilder {
857 Trigger::builder(format!("{}_updated_at", table))
858 .on_table(table)
859 .before()
860 .on_update()
861 .for_each_row()
862 .execute_sql([format!("NEW.{} = NOW()", column)])
863 }
864
865 pub fn validation_trigger(
867 table: &str,
868 name: &str,
869 condition: &str,
870 error_message: &str,
871 ) -> TriggerBuilder {
872 Trigger::builder(name)
873 .on_table(table)
874 .before()
875 .on_insert()
876 .on_update()
877 .for_each_row()
878 .when_expr(condition)
879 .execute_sql([format!("RAISE EXCEPTION '{}'", error_message)])
880 }
881}
882
883pub mod mongodb {
885 use super::*;
886
887 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
889 pub enum ChangeType {
890 Insert,
892 Update,
894 Replace,
896 Delete,
898 Drop,
900 Rename,
902 DropDatabase,
904 Invalidate,
906 }
907
908 impl ChangeType {
909 pub fn as_str(&self) -> &'static str {
911 match self {
912 Self::Insert => "insert",
913 Self::Update => "update",
914 Self::Replace => "replace",
915 Self::Delete => "delete",
916 Self::Drop => "drop",
917 Self::Rename => "rename",
918 Self::DropDatabase => "dropDatabase",
919 Self::Invalidate => "invalidate",
920 }
921 }
922 }
923
924 #[derive(Debug, Clone, Default, Serialize, Deserialize)]
926 pub struct ChangeStreamOptions {
927 pub resume_after: Option<String>,
929 pub start_at_operation_time: Option<String>,
931 pub full_document: FullDocument,
933 pub full_document_before_change: FullDocumentBeforeChange,
935 pub max_await_time_ms: Option<u64>,
937 pub batch_size: Option<u32>,
939 }
940
941 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
943 pub enum FullDocument {
944 #[default]
946 Default,
947 UpdateLookup,
949 WhenAvailable,
951 Required,
953 }
954
955 impl FullDocument {
956 pub fn as_str(&self) -> &'static str {
958 match self {
959 Self::Default => "default",
960 Self::UpdateLookup => "updateLookup",
961 Self::WhenAvailable => "whenAvailable",
962 Self::Required => "required",
963 }
964 }
965 }
966
967 #[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
969 pub enum FullDocumentBeforeChange {
970 #[default]
972 Off,
973 WhenAvailable,
975 Required,
977 }
978
979 impl FullDocumentBeforeChange {
980 pub fn as_str(&self) -> &'static str {
982 match self {
983 Self::Off => "off",
984 Self::WhenAvailable => "whenAvailable",
985 Self::Required => "required",
986 }
987 }
988 }
989
990 #[derive(Debug, Clone, Serialize, Deserialize)]
992 pub struct ChangeStreamPipeline {
993 pub stages: Vec<PipelineStage>,
995 }
996
997 impl ChangeStreamPipeline {
998 pub fn new() -> Self {
1000 Self { stages: Vec::new() }
1001 }
1002
1003 pub fn match_stage(mut self, filter: serde_json::Value) -> Self {
1005 self.stages.push(PipelineStage::Match(filter));
1006 self
1007 }
1008
1009 pub fn operation_types(self, types: &[ChangeType]) -> Self {
1011 let type_strs: Vec<_> = types.iter().map(|t| t.as_str()).collect();
1012 self.match_stage(serde_json::json!({
1013 "operationType": { "$in": type_strs }
1014 }))
1015 }
1016
1017 pub fn namespace(self, db: &str, collection: &str) -> Self {
1019 self.match_stage(serde_json::json!({
1020 "ns": { "db": db, "coll": collection }
1021 }))
1022 }
1023
1024 pub fn project(mut self, projection: serde_json::Value) -> Self {
1026 self.stages.push(PipelineStage::Project(projection));
1027 self
1028 }
1029 }
1030
1031 impl Default for ChangeStreamPipeline {
1032 fn default() -> Self {
1033 Self::new()
1034 }
1035 }
1036
1037 #[derive(Debug, Clone, Serialize, Deserialize)]
1039 pub enum PipelineStage {
1040 Match(serde_json::Value),
1042 Project(serde_json::Value),
1044 AddFields(serde_json::Value),
1046 ReplaceRoot(serde_json::Value),
1048 Redact(serde_json::Value),
1050 }
1051
1052 #[derive(Debug, Clone, Default)]
1054 pub struct ChangeStreamBuilder {
1055 collection: Option<String>,
1056 database: Option<String>,
1057 pipeline: ChangeStreamPipeline,
1058 options: ChangeStreamOptions,
1059 }
1060
1061 impl ChangeStreamBuilder {
1062 pub fn new() -> Self {
1064 Self::default()
1065 }
1066
1067 pub fn collection(mut self, name: impl Into<String>) -> Self {
1069 self.collection = Some(name.into());
1070 self
1071 }
1072
1073 pub fn database(mut self, name: impl Into<String>) -> Self {
1075 self.database = Some(name.into());
1076 self
1077 }
1078
1079 pub fn operations(mut self, types: &[ChangeType]) -> Self {
1081 self.pipeline = self.pipeline.operation_types(types);
1082 self
1083 }
1084
1085 pub fn filter(mut self, filter: serde_json::Value) -> Self {
1087 self.pipeline = self.pipeline.match_stage(filter);
1088 self
1089 }
1090
1091 pub fn full_document(mut self, policy: FullDocument) -> Self {
1093 self.options.full_document = policy;
1094 self
1095 }
1096
1097 pub fn full_document_before_change(mut self, policy: FullDocumentBeforeChange) -> Self {
1099 self.options.full_document_before_change = policy;
1100 self
1101 }
1102
1103 pub fn resume_after(mut self, token: impl Into<String>) -> Self {
1105 self.options.resume_after = Some(token.into());
1106 self
1107 }
1108
1109 pub fn max_await_time_ms(mut self, ms: u64) -> Self {
1111 self.options.max_await_time_ms = Some(ms);
1112 self
1113 }
1114
1115 pub fn batch_size(mut self, size: u32) -> Self {
1117 self.options.batch_size = Some(size);
1118 self
1119 }
1120
1121 pub fn build_pipeline(&self) -> &[PipelineStage] {
1123 &self.pipeline.stages
1124 }
1125
1126 pub fn build_options(&self) -> &ChangeStreamOptions {
1128 &self.options
1129 }
1130 }
1131}
1132
1133#[cfg(test)]
1134mod tests {
1135 use super::*;
1136
1137 #[test]
1138 fn test_trigger_builder() {
1139 let trigger = Trigger::builder("audit_users")
1140 .on_table("users")
1141 .after()
1142 .on_update()
1143 .on_delete()
1144 .for_each_row()
1145 .execute_function("audit_log_changes")
1146 .build()
1147 .unwrap();
1148
1149 assert_eq!(trigger.name, "audit_users");
1150 assert_eq!(trigger.table, "users");
1151 assert_eq!(trigger.timing, TriggerTiming::After);
1152 assert!(trigger.events.contains(&TriggerEvent::Update));
1153 assert!(trigger.events.contains(&TriggerEvent::Delete));
1154 assert_eq!(trigger.level, TriggerLevel::Row);
1155 }
1156
1157 #[test]
1158 fn test_postgres_trigger_sql() {
1159 let trigger = Trigger::builder("audit_users")
1160 .on_table("users")
1161 .after()
1162 .on_insert()
1163 .on_update()
1164 .for_each_row()
1165 .execute_function("audit_func")
1166 .build()
1167 .unwrap();
1168
1169 let sql = trigger.to_postgres_sql().unwrap();
1170 assert!(sql.contains("CREATE TRIGGER audit_users"));
1171 assert!(sql.contains("AFTER"));
1172 assert!(sql.contains("ON users"));
1173 assert!(sql.contains("FOR EACH ROW"));
1174 assert!(sql.contains("EXECUTE FUNCTION audit_func()"));
1175 }
1176
1177 #[test]
1178 fn test_mysql_trigger_sql() {
1179 let trigger = Trigger::builder("audit_users")
1180 .on_table("users")
1181 .after()
1182 .on_insert()
1183 .for_each_row()
1184 .execute_sql(["INSERT INTO audit_log VALUES (NEW.id, 'INSERT')"])
1185 .build()
1186 .unwrap();
1187
1188 let sql = trigger.to_mysql_sql().unwrap();
1189 assert!(sql.contains("CREATE TRIGGER audit_users"));
1190 assert!(sql.contains("AFTER INSERT"));
1191 assert!(sql.contains("ON `users`"));
1192 assert!(sql.contains("FOR EACH ROW"));
1193 }
1194
1195 #[test]
1196 fn test_mysql_multiple_events_error() {
1197 let trigger = Trigger::builder("audit_users")
1198 .on_table("users")
1199 .after()
1200 .on_insert()
1201 .on_update()
1202 .execute_sql(["SELECT 1"])
1203 .build()
1204 .unwrap();
1205
1206 let result = trigger.to_mysql_sql();
1207 assert!(result.is_err());
1208 }
1209
1210 #[test]
1211 fn test_sqlite_trigger_sql() {
1212 let trigger = Trigger::builder("audit_users")
1213 .on_table("users")
1214 .after()
1215 .on_delete()
1216 .for_each_row()
1217 .when_expr("OLD.important = 1")
1218 .execute_sql(["INSERT INTO deleted_users SELECT * FROM OLD"])
1219 .build()
1220 .unwrap();
1221
1222 let sql = trigger.to_sqlite_sql().unwrap();
1223 assert!(sql.contains("CREATE TRIGGER audit_users"));
1224 assert!(sql.contains("AFTER DELETE"));
1225 assert!(sql.contains("ON `users`"));
1226 assert!(sql.contains("WHEN OLD.important = 1"));
1227 assert!(sql.contains("BEGIN"));
1228 assert!(sql.contains("END;"));
1229 }
1230
1231 #[test]
1232 fn test_mssql_trigger_sql() {
1233 let trigger = Trigger::builder("audit_users")
1234 .schema("dbo")
1235 .on_table("users")
1236 .after()
1237 .on_insert()
1238 .on_update()
1239 .execute_sql(["INSERT INTO audit_log SELECT * FROM inserted"])
1240 .build()
1241 .unwrap();
1242
1243 let sql = trigger.to_mssql_sql().unwrap();
1244 assert!(sql.contains("CREATE TRIGGER dbo.audit_users"));
1245 assert!(sql.contains("ON dbo.users"));
1246 assert!(sql.contains("AFTER INSERT, UPDATE") || sql.contains("AFTER UPDATE, INSERT"));
1247 assert!(sql.contains("SET NOCOUNT ON"));
1248 }
1249
1250 #[test]
1251 fn test_mssql_before_error() {
1252 let trigger = Trigger::builder("audit_users")
1253 .on_table("users")
1254 .before()
1255 .on_insert()
1256 .execute_sql(["SELECT 1"])
1257 .build()
1258 .unwrap();
1259
1260 let result = trigger.to_mssql_sql();
1261 assert!(result.is_err());
1262 }
1263
1264 #[test]
1265 fn test_drop_trigger_sql() {
1266 let trigger = Trigger::builder("audit_users")
1267 .on_table("users")
1268 .after()
1269 .on_insert()
1270 .execute_function("audit_func")
1271 .build()
1272 .unwrap();
1273
1274 let pg_drop = trigger.drop_sql(DatabaseType::PostgreSQL);
1275 assert_eq!(pg_drop, "DROP TRIGGER IF EXISTS audit_users ON users;");
1276
1277 let mysql_drop = trigger.drop_sql(DatabaseType::MySQL);
1278 assert_eq!(mysql_drop, "DROP TRIGGER IF EXISTS audit_users;");
1279 }
1280
1281 #[test]
1282 fn test_trigger_condition() {
1283 let cond = TriggerCondition::column_changed("email")
1284 .and(TriggerCondition::new_not_null("verified"));
1285
1286 assert!(cond.expression.contains("OLD.email IS DISTINCT FROM NEW.email"));
1287 assert!(cond.expression.contains("NEW.verified IS NOT NULL"));
1288 }
1289
1290 #[test]
1291 fn test_update_of() {
1292 let update_of = UpdateOf::new(["email", "password"]);
1293 assert_eq!(update_of.to_sql(), " OF email, password");
1294 }
1295
1296 #[test]
1297 fn test_trigger_with_update_of() {
1298 let trigger = Trigger::builder("sensitive_update")
1299 .on_table("users")
1300 .before()
1301 .on_update()
1302 .update_of(["email", "password"])
1303 .execute_function("validate_sensitive_update")
1304 .build()
1305 .unwrap();
1306
1307 let sql = trigger.to_postgres_sql().unwrap();
1308 assert!(sql.contains("UPDATE OF email, password"));
1309 }
1310
1311 #[test]
1312 fn test_instead_of_trigger() {
1313 let trigger = Trigger::builder("view_insert")
1314 .on_view("user_view")
1315 .instead_of()
1316 .on_insert()
1317 .execute_function("handle_view_insert")
1318 .build()
1319 .unwrap();
1320
1321 let sql = trigger.to_postgres_sql().unwrap();
1322 assert!(sql.contains("INSTEAD OF INSERT"));
1323 }
1324
1325 #[test]
1326 fn test_missing_table_error() {
1327 let result = Trigger::builder("test")
1328 .on_insert()
1329 .execute_function("func")
1330 .build();
1331
1332 assert!(result.is_err());
1333 }
1334
1335 #[test]
1336 fn test_missing_events_error() {
1337 let result = Trigger::builder("test")
1338 .on_table("users")
1339 .execute_function("func")
1340 .build();
1341
1342 assert!(result.is_err());
1343 }
1344
1345 #[test]
1346 fn test_missing_action_error() {
1347 let result = Trigger::builder("test")
1348 .on_table("users")
1349 .on_insert()
1350 .build();
1351
1352 assert!(result.is_err());
1353 }
1354
1355 mod mongodb_tests {
1356 use super::super::mongodb::*;
1357
1358 #[test]
1359 fn test_change_stream_builder() {
1360 let builder = ChangeStreamBuilder::new()
1361 .collection("users")
1362 .operations(&[ChangeType::Insert, ChangeType::Update])
1363 .full_document(FullDocument::UpdateLookup)
1364 .batch_size(100);
1365
1366 assert_eq!(builder.build_options().full_document, FullDocument::UpdateLookup);
1367 assert_eq!(builder.build_options().batch_size, Some(100));
1368 }
1369
1370 #[test]
1371 fn test_change_type() {
1372 assert_eq!(ChangeType::Insert.as_str(), "insert");
1373 assert_eq!(ChangeType::Update.as_str(), "update");
1374 assert_eq!(ChangeType::Delete.as_str(), "delete");
1375 }
1376
1377 #[test]
1378 fn test_full_document_options() {
1379 assert_eq!(FullDocument::Default.as_str(), "default");
1380 assert_eq!(FullDocument::UpdateLookup.as_str(), "updateLookup");
1381 assert_eq!(FullDocumentBeforeChange::Required.as_str(), "required");
1382 }
1383 }
1384}
1385