1use std::collections::HashMap;
30
31use serde::{Deserialize, Serialize};
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct ProcedureDefinition {
40 pub name: String,
42 pub schema: Option<String>,
44 pub is_function: bool,
46 pub parameters: Vec<ProcedureParameter>,
48 pub return_type: Option<String>,
50 pub returns_set: bool,
52 pub return_columns: Vec<ReturnColumn>,
54 pub language: ProcedureLanguage,
56 pub body: String,
58 pub volatility: Volatility,
60 pub security_definer: bool,
62 pub cost: Option<i32>,
64 pub rows: Option<i32>,
66 pub parallel: ParallelSafety,
68 pub or_replace: bool,
70 pub comment: Option<String>,
72 pub checksum: Option<String>,
74 pub version: Option<i32>,
76}
77
78impl Default for ProcedureDefinition {
79 fn default() -> Self {
80 Self {
81 name: String::new(),
82 schema: None,
83 is_function: true,
84 parameters: Vec::new(),
85 return_type: None,
86 returns_set: false,
87 return_columns: Vec::new(),
88 language: ProcedureLanguage::Sql,
89 body: String::new(),
90 volatility: Volatility::Volatile,
91 security_definer: false,
92 cost: None,
93 rows: None,
94 parallel: ParallelSafety::Unsafe,
95 or_replace: true,
96 comment: None,
97 checksum: None,
98 version: None,
99 }
100 }
101}
102
103impl ProcedureDefinition {
104 pub fn new(name: impl Into<String>) -> Self {
106 Self {
107 name: name.into(),
108 ..Default::default()
109 }
110 }
111
112 pub fn function(name: impl Into<String>) -> Self {
114 Self {
115 name: name.into(),
116 is_function: true,
117 ..Default::default()
118 }
119 }
120
121 pub fn procedure(name: impl Into<String>) -> Self {
123 Self {
124 name: name.into(),
125 is_function: false,
126 ..Default::default()
127 }
128 }
129
130 pub fn schema(mut self, schema: impl Into<String>) -> Self {
132 self.schema = Some(schema.into());
133 self
134 }
135
136 pub fn param(mut self, name: impl Into<String>, data_type: impl Into<String>) -> Self {
138 self.parameters.push(ProcedureParameter {
139 name: name.into(),
140 data_type: data_type.into(),
141 mode: ParameterMode::In,
142 default: None,
143 });
144 self
145 }
146
147 pub fn out_param(mut self, name: impl Into<String>, data_type: impl Into<String>) -> Self {
149 self.parameters.push(ProcedureParameter {
150 name: name.into(),
151 data_type: data_type.into(),
152 mode: ParameterMode::Out,
153 default: None,
154 });
155 self
156 }
157
158 pub fn inout_param(mut self, name: impl Into<String>, data_type: impl Into<String>) -> Self {
160 self.parameters.push(ProcedureParameter {
161 name: name.into(),
162 data_type: data_type.into(),
163 mode: ParameterMode::InOut,
164 default: None,
165 });
166 self
167 }
168
169 pub fn returns(mut self, return_type: impl Into<String>) -> Self {
171 self.return_type = Some(return_type.into());
172 self
173 }
174
175 pub fn returns_setof(mut self, return_type: impl Into<String>) -> Self {
177 self.return_type = Some(return_type.into());
178 self.returns_set = true;
179 self
180 }
181
182 pub fn returns_table(mut self, columns: Vec<ReturnColumn>) -> Self {
184 self.returns_set = true;
185 self.return_columns = columns;
186 self
187 }
188
189 pub fn language(mut self, language: ProcedureLanguage) -> Self {
191 self.language = language;
192 self
193 }
194
195 pub fn body(mut self, body: impl Into<String>) -> Self {
197 self.body = body.into();
198 self.update_checksum();
199 self
200 }
201
202 pub fn volatility(mut self, volatility: Volatility) -> Self {
204 self.volatility = volatility;
205 self
206 }
207
208 pub fn immutable(mut self) -> Self {
210 self.volatility = Volatility::Immutable;
211 self
212 }
213
214 pub fn stable(mut self) -> Self {
216 self.volatility = Volatility::Stable;
217 self
218 }
219
220 pub fn security_definer(mut self) -> Self {
222 self.security_definer = true;
223 self
224 }
225
226 pub fn cost(mut self, cost: i32) -> Self {
228 self.cost = Some(cost);
229 self
230 }
231
232 pub fn parallel(mut self, parallel: ParallelSafety) -> Self {
234 self.parallel = parallel;
235 self
236 }
237
238 pub fn comment(mut self, comment: impl Into<String>) -> Self {
240 self.comment = Some(comment.into());
241 self
242 }
243
244 fn update_checksum(&mut self) {
246 use std::collections::hash_map::DefaultHasher;
247 use std::hash::{Hash, Hasher};
248
249 let mut hasher = DefaultHasher::new();
250 self.body.hash(&mut hasher);
251 self.checksum = Some(format!("{:016x}", hasher.finish()));
252 }
253
254 pub fn qualified_name(&self) -> String {
256 match &self.schema {
257 Some(schema) => format!("{}.{}", schema, self.name),
258 None => self.name.clone(),
259 }
260 }
261
262 pub fn has_changed(&self, other: &ProcedureDefinition) -> bool {
264 if let (Some(a), Some(b)) = (&self.checksum, &other.checksum) {
266 if a != b {
267 return true;
268 }
269 }
270
271 self.body != other.body
273 || self.parameters != other.parameters
274 || self.return_type != other.return_type
275 || self.returns_set != other.returns_set
276 || self.language != other.language
277 || self.volatility != other.volatility
278 || self.security_definer != other.security_definer
279 }
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
284pub struct ProcedureParameter {
285 pub name: String,
287 pub data_type: String,
289 pub mode: ParameterMode,
291 pub default: Option<String>,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
297pub enum ParameterMode {
298 #[default]
299 In,
300 Out,
301 InOut,
302 Variadic,
303}
304
305#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
307pub struct ReturnColumn {
308 pub name: String,
310 pub data_type: String,
312}
313
314impl ReturnColumn {
315 pub fn new(name: impl Into<String>, data_type: impl Into<String>) -> Self {
317 Self {
318 name: name.into(),
319 data_type: data_type.into(),
320 }
321 }
322}
323
324#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
326pub enum ProcedureLanguage {
327 #[default]
328 Sql,
329 PlPgSql,
330 PlPython,
331 PlPerl,
332 PlTcl,
333 PlV8,
334 C,
335}
336
337impl ProcedureLanguage {
338 pub fn to_sql(&self) -> &'static str {
340 match self {
341 Self::Sql => "SQL",
342 Self::PlPgSql => "plpgsql",
343 Self::PlPython => "plpython3u",
344 Self::PlPerl => "plperl",
345 Self::PlTcl => "pltcl",
346 Self::PlV8 => "plv8",
347 Self::C => "C",
348 }
349 }
350}
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
354pub enum Volatility {
355 #[default]
356 Volatile,
357 Stable,
358 Immutable,
359}
360
361impl Volatility {
362 pub fn to_sql(&self) -> &'static str {
364 match self {
365 Self::Volatile => "VOLATILE",
366 Self::Stable => "STABLE",
367 Self::Immutable => "IMMUTABLE",
368 }
369 }
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
374pub enum ParallelSafety {
375 #[default]
376 Unsafe,
377 Restricted,
378 Safe,
379}
380
381impl ParallelSafety {
382 pub fn to_sql(&self) -> &'static str {
384 match self {
385 Self::Unsafe => "PARALLEL UNSAFE",
386 Self::Restricted => "PARALLEL RESTRICTED",
387 Self::Safe => "PARALLEL SAFE",
388 }
389 }
390}
391
392#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
398pub struct TriggerDefinition {
399 pub name: String,
401 pub schema: Option<String>,
403 pub table: String,
405 pub timing: TriggerTiming,
407 pub events: Vec<TriggerEvent>,
409 pub level: TriggerLevel,
411 pub condition: Option<String>,
413 pub function: String,
415 pub function_args: Vec<String>,
417 pub or_replace: bool,
419 pub comment: Option<String>,
421 pub checksum: Option<String>,
423}
424
425impl Default for TriggerDefinition {
426 fn default() -> Self {
427 Self {
428 name: String::new(),
429 schema: None,
430 table: String::new(),
431 timing: TriggerTiming::Before,
432 events: vec![TriggerEvent::Insert],
433 level: TriggerLevel::Row,
434 condition: None,
435 function: String::new(),
436 function_args: Vec::new(),
437 or_replace: true,
438 comment: None,
439 checksum: None,
440 }
441 }
442}
443
444impl TriggerDefinition {
445 pub fn new(name: impl Into<String>, table: impl Into<String>) -> Self {
447 Self {
448 name: name.into(),
449 table: table.into(),
450 ..Default::default()
451 }
452 }
453
454 pub fn before(mut self) -> Self {
456 self.timing = TriggerTiming::Before;
457 self
458 }
459
460 pub fn after(mut self) -> Self {
462 self.timing = TriggerTiming::After;
463 self
464 }
465
466 pub fn instead_of(mut self) -> Self {
468 self.timing = TriggerTiming::InsteadOf;
469 self
470 }
471
472 pub fn on(mut self, events: Vec<TriggerEvent>) -> Self {
474 self.events = events;
475 self
476 }
477
478 pub fn for_each_row(mut self) -> Self {
480 self.level = TriggerLevel::Row;
481 self
482 }
483
484 pub fn for_each_statement(mut self) -> Self {
486 self.level = TriggerLevel::Statement;
487 self
488 }
489
490 pub fn when(mut self, condition: impl Into<String>) -> Self {
492 self.condition = Some(condition.into());
493 self
494 }
495
496 pub fn execute(mut self, function: impl Into<String>) -> Self {
498 self.function = function.into();
499 self
500 }
501
502 pub fn qualified_name(&self) -> String {
504 match &self.schema {
505 Some(schema) => format!("{}.{}", schema, self.name),
506 None => self.name.clone(),
507 }
508 }
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
513pub enum TriggerTiming {
514 #[default]
515 Before,
516 After,
517 InsteadOf,
518}
519
520impl TriggerTiming {
521 pub fn to_sql(&self) -> &'static str {
523 match self {
524 Self::Before => "BEFORE",
525 Self::After => "AFTER",
526 Self::InsteadOf => "INSTEAD OF",
527 }
528 }
529}
530
531#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
533pub enum TriggerEvent {
534 Insert,
535 Update,
536 Delete,
537 Truncate,
538}
539
540impl TriggerEvent {
541 pub fn to_sql(&self) -> &'static str {
543 match self {
544 Self::Insert => "INSERT",
545 Self::Update => "UPDATE",
546 Self::Delete => "DELETE",
547 Self::Truncate => "TRUNCATE",
548 }
549 }
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
554pub enum TriggerLevel {
555 #[default]
556 Row,
557 Statement,
558}
559
560impl TriggerLevel {
561 pub fn to_sql(&self) -> &'static str {
563 match self {
564 Self::Row => "FOR EACH ROW",
565 Self::Statement => "FOR EACH STATEMENT",
566 }
567 }
568}
569
570#[derive(Debug, Clone, Default)]
576pub struct ProcedureDiff {
577 pub create: Vec<ProcedureDefinition>,
579 pub drop: Vec<String>,
581 pub alter: Vec<ProcedureAlterDiff>,
583 pub create_triggers: Vec<TriggerDefinition>,
585 pub drop_triggers: Vec<String>,
587 pub alter_triggers: Vec<TriggerAlterDiff>,
589}
590
591#[derive(Debug, Clone)]
593pub struct ProcedureAlterDiff {
594 pub old: ProcedureDefinition,
596 pub new: ProcedureDefinition,
598 pub changes: Vec<ProcedureChange>,
600}
601
602#[derive(Debug, Clone, PartialEq, Eq)]
604pub enum ProcedureChange {
605 Body,
606 Parameters,
607 ReturnType,
608 Language,
609 Volatility,
610 SecurityDefiner,
611 Cost,
612 Parallel,
613}
614
615#[derive(Debug, Clone)]
617pub struct TriggerAlterDiff {
618 pub old: TriggerDefinition,
620 pub new: TriggerDefinition,
622}
623
624impl ProcedureDiff {
625 pub fn is_empty(&self) -> bool {
627 self.create.is_empty()
628 && self.drop.is_empty()
629 && self.alter.is_empty()
630 && self.create_triggers.is_empty()
631 && self.drop_triggers.is_empty()
632 && self.alter_triggers.is_empty()
633 }
634
635 pub fn summary(&self) -> String {
637 let mut parts = Vec::new();
638
639 if !self.create.is_empty() {
640 parts.push(format!("Create {} procedures", self.create.len()));
641 }
642 if !self.drop.is_empty() {
643 parts.push(format!("Drop {} procedures", self.drop.len()));
644 }
645 if !self.alter.is_empty() {
646 parts.push(format!("Alter {} procedures", self.alter.len()));
647 }
648 if !self.create_triggers.is_empty() {
649 parts.push(format!("Create {} triggers", self.create_triggers.len()));
650 }
651 if !self.drop_triggers.is_empty() {
652 parts.push(format!("Drop {} triggers", self.drop_triggers.len()));
653 }
654 if !self.alter_triggers.is_empty() {
655 parts.push(format!("Alter {} triggers", self.alter_triggers.len()));
656 }
657
658 if parts.is_empty() {
659 "No changes".to_string()
660 } else {
661 parts.join(", ")
662 }
663 }
664}
665
666pub struct ProcedureDiffer;
668
669impl ProcedureDiffer {
670 pub fn diff(
672 from: &[ProcedureDefinition],
673 to: &[ProcedureDefinition],
674 ) -> ProcedureDiff {
675 let mut diff = ProcedureDiff::default();
676
677 let from_map: HashMap<_, _> = from.iter().map(|p| (p.qualified_name(), p)).collect();
678 let to_map: HashMap<_, _> = to.iter().map(|p| (p.qualified_name(), p)).collect();
679
680 for (name, proc) in &to_map {
682 if !from_map.contains_key(name) {
683 diff.create.push((*proc).clone());
684 }
685 }
686
687 for (name, _) in &from_map {
689 if !to_map.contains_key(name) {
690 diff.drop.push(name.clone());
691 }
692 }
693
694 for (name, new_proc) in &to_map {
696 if let Some(old_proc) = from_map.get(name) {
697 if old_proc.has_changed(new_proc) {
698 let changes = detect_procedure_changes(old_proc, new_proc);
699 diff.alter.push(ProcedureAlterDiff {
700 old: (*old_proc).clone(),
701 new: (*new_proc).clone(),
702 changes,
703 });
704 }
705 }
706 }
707
708 diff
709 }
710
711 pub fn diff_triggers(
713 from: &[TriggerDefinition],
714 to: &[TriggerDefinition],
715 ) -> ProcedureDiff {
716 let mut diff = ProcedureDiff::default();
717
718 let from_map: HashMap<_, _> = from.iter().map(|t| (t.qualified_name(), t)).collect();
719 let to_map: HashMap<_, _> = to.iter().map(|t| (t.qualified_name(), t)).collect();
720
721 for (name, trigger) in &to_map {
723 if !from_map.contains_key(name) {
724 diff.create_triggers.push((*trigger).clone());
725 }
726 }
727
728 for (name, _) in &from_map {
730 if !to_map.contains_key(name) {
731 diff.drop_triggers.push(name.clone());
732 }
733 }
734
735 for (name, new_trigger) in &to_map {
737 if let Some(old_trigger) = from_map.get(name) {
738 if old_trigger != new_trigger {
739 diff.alter_triggers.push(TriggerAlterDiff {
740 old: (*old_trigger).clone(),
741 new: (*new_trigger).clone(),
742 });
743 }
744 }
745 }
746
747 diff
748 }
749}
750
751fn detect_procedure_changes(
752 old: &ProcedureDefinition,
753 new: &ProcedureDefinition,
754) -> Vec<ProcedureChange> {
755 let mut changes = Vec::new();
756
757 if old.body != new.body {
758 changes.push(ProcedureChange::Body);
759 }
760 if old.parameters != new.parameters {
761 changes.push(ProcedureChange::Parameters);
762 }
763 if old.return_type != new.return_type || old.returns_set != new.returns_set {
764 changes.push(ProcedureChange::ReturnType);
765 }
766 if old.language != new.language {
767 changes.push(ProcedureChange::Language);
768 }
769 if old.volatility != new.volatility {
770 changes.push(ProcedureChange::Volatility);
771 }
772 if old.security_definer != new.security_definer {
773 changes.push(ProcedureChange::SecurityDefiner);
774 }
775 if old.cost != new.cost {
776 changes.push(ProcedureChange::Cost);
777 }
778 if old.parallel != new.parallel {
779 changes.push(ProcedureChange::Parallel);
780 }
781
782 changes
783}
784
785pub struct ProcedureSqlGenerator {
791 pub db_type: DatabaseType,
793}
794
795#[derive(Debug, Clone, Copy, PartialEq, Eq)]
797pub enum DatabaseType {
798 PostgreSQL,
799 MySQL,
800 SQLite,
801 MSSQL,
802}
803
804impl ProcedureSqlGenerator {
805 pub fn new(db_type: DatabaseType) -> Self {
807 Self { db_type }
808 }
809
810 pub fn create_procedure(&self, proc: &ProcedureDefinition) -> String {
812 match self.db_type {
813 DatabaseType::PostgreSQL => self.create_postgres_procedure(proc),
814 DatabaseType::MySQL => self.create_mysql_procedure(proc),
815 DatabaseType::SQLite => self.create_sqlite_udf(proc),
816 DatabaseType::MSSQL => self.create_mssql_procedure(proc),
817 }
818 }
819
820 pub fn drop_procedure(&self, proc: &ProcedureDefinition) -> String {
822 let obj_type = if proc.is_function { "FUNCTION" } else { "PROCEDURE" };
823 let name = proc.qualified_name();
824
825 match self.db_type {
826 DatabaseType::PostgreSQL => {
827 let params = proc
829 .parameters
830 .iter()
831 .map(|p| p.data_type.as_str())
832 .collect::<Vec<_>>()
833 .join(", ");
834 format!("DROP {} IF EXISTS {}({});", obj_type, name, params)
835 }
836 DatabaseType::MySQL => {
837 format!("DROP {} IF EXISTS {};", obj_type, name)
838 }
839 DatabaseType::SQLite => {
840 format!("-- SQLite: Remove UDF registration for {}", name)
842 }
843 DatabaseType::MSSQL => {
844 format!(
845 "IF OBJECT_ID('{}', '{}') IS NOT NULL DROP {} {};",
846 name,
847 if proc.is_function { "FN" } else { "P" },
848 obj_type,
849 name
850 )
851 }
852 }
853 }
854
855 pub fn alter_procedure(&self, diff: &ProcedureAlterDiff) -> String {
857 match self.db_type {
859 DatabaseType::PostgreSQL => {
860 self.create_postgres_procedure(&diff.new)
862 }
863 DatabaseType::MySQL => {
864 format!(
866 "{}\n{}",
867 self.drop_procedure(&diff.old),
868 self.create_mysql_procedure(&diff.new)
869 )
870 }
871 DatabaseType::SQLite => {
872 self.create_sqlite_udf(&diff.new)
873 }
874 DatabaseType::MSSQL => {
875 self.alter_mssql_procedure(&diff.new)
877 }
878 }
879 }
880
881 pub fn create_trigger(&self, trigger: &TriggerDefinition) -> String {
883 match self.db_type {
884 DatabaseType::PostgreSQL => self.create_postgres_trigger(trigger),
885 DatabaseType::MySQL => self.create_mysql_trigger(trigger),
886 DatabaseType::SQLite => self.create_sqlite_trigger(trigger),
887 DatabaseType::MSSQL => self.create_mssql_trigger(trigger),
888 }
889 }
890
891 pub fn drop_trigger(&self, trigger: &TriggerDefinition) -> String {
893 match self.db_type {
894 DatabaseType::PostgreSQL => {
895 format!(
896 "DROP TRIGGER IF EXISTS {} ON {};",
897 trigger.name, trigger.table
898 )
899 }
900 DatabaseType::MySQL => {
901 format!("DROP TRIGGER IF EXISTS {};", trigger.name)
902 }
903 DatabaseType::SQLite => {
904 format!("DROP TRIGGER IF EXISTS {};", trigger.name)
905 }
906 DatabaseType::MSSQL => {
907 format!("DROP TRIGGER IF EXISTS {};", trigger.qualified_name())
908 }
909 }
910 }
911
912 fn create_postgres_procedure(&self, proc: &ProcedureDefinition) -> String {
914 let mut sql = String::new();
915
916 let obj_type = if proc.is_function { "FUNCTION" } else { "PROCEDURE" };
917 let or_replace = if proc.or_replace { "OR REPLACE " } else { "" };
918
919 sql.push_str(&format!("CREATE {}{} {} (", or_replace, obj_type, proc.qualified_name()));
920
921 let params: Vec<String> = proc
923 .parameters
924 .iter()
925 .map(|p| {
926 let mode = match p.mode {
927 ParameterMode::In => "",
928 ParameterMode::Out => "OUT ",
929 ParameterMode::InOut => "INOUT ",
930 ParameterMode::Variadic => "VARIADIC ",
931 };
932 let default = p
933 .default
934 .as_ref()
935 .map(|d| format!(" DEFAULT {}", d))
936 .unwrap_or_default();
937 format!("{}{} {}{}", mode, p.name, p.data_type, default)
938 })
939 .collect();
940 sql.push_str(¶ms.join(", "));
941 sql.push_str(")\n");
942
943 if let Some(ref ret) = proc.return_type {
945 if proc.returns_set {
946 sql.push_str(&format!("RETURNS SETOF {}\n", ret));
947 } else {
948 sql.push_str(&format!("RETURNS {}\n", ret));
949 }
950 } else if !proc.return_columns.is_empty() {
951 let cols: Vec<String> = proc
952 .return_columns
953 .iter()
954 .map(|c| format!("{} {}", c.name, c.data_type))
955 .collect();
956 sql.push_str(&format!("RETURNS TABLE ({})\n", cols.join(", ")));
957 } else if proc.is_function {
958 sql.push_str("RETURNS void\n");
959 }
960
961 sql.push_str(&format!("LANGUAGE {}\n", proc.language.to_sql()));
963
964 sql.push_str(&format!("{}\n", proc.volatility.to_sql()));
966
967 if proc.security_definer {
969 sql.push_str("SECURITY DEFINER\n");
970 }
971
972 if let Some(cost) = proc.cost {
974 sql.push_str(&format!("COST {}\n", cost));
975 }
976
977 if proc.parallel != ParallelSafety::Unsafe {
979 sql.push_str(&format!("{}\n", proc.parallel.to_sql()));
980 }
981
982 sql.push_str(&format!("AS $$\n{}\n$$;", proc.body));
984
985 if let Some(ref comment) = proc.comment {
987 sql.push_str(&format!(
988 "\n\nCOMMENT ON {} {} IS '{}';",
989 obj_type,
990 proc.qualified_name(),
991 comment.replace('\'', "''")
992 ));
993 }
994
995 sql
996 }
997
998 fn create_postgres_trigger(&self, trigger: &TriggerDefinition) -> String {
999 let mut sql = String::new();
1000
1001 let or_replace = if trigger.or_replace {
1002 "OR REPLACE "
1003 } else {
1004 ""
1005 };
1006
1007 sql.push_str(&format!(
1008 "CREATE {}TRIGGER {}\n",
1009 or_replace, trigger.name
1010 ));
1011
1012 sql.push_str(&format!("{} ", trigger.timing.to_sql()));
1014
1015 let events: Vec<&str> = trigger.events.iter().map(|e| e.to_sql()).collect();
1017 sql.push_str(&events.join(" OR "));
1018
1019 sql.push_str(&format!("\nON {}\n", trigger.table));
1021
1022 sql.push_str(&format!("{}\n", trigger.level.to_sql()));
1024
1025 if let Some(ref cond) = trigger.condition {
1027 sql.push_str(&format!("WHEN ({})\n", cond));
1028 }
1029
1030 let args = if trigger.function_args.is_empty() {
1032 String::new()
1033 } else {
1034 trigger.function_args.join(", ")
1035 };
1036 sql.push_str(&format!("EXECUTE FUNCTION {}({});", trigger.function, args));
1037
1038 sql
1039 }
1040
1041 fn create_mysql_procedure(&self, proc: &ProcedureDefinition) -> String {
1043 let mut sql = String::new();
1044
1045 let obj_type = if proc.is_function { "FUNCTION" } else { "PROCEDURE" };
1046
1047 sql.push_str(&format!("CREATE {} {} (", obj_type, proc.qualified_name()));
1049
1050 let params: Vec<String> = proc
1052 .parameters
1053 .iter()
1054 .map(|p| {
1055 let mode = match p.mode {
1056 ParameterMode::In => "IN ",
1057 ParameterMode::Out => "OUT ",
1058 ParameterMode::InOut => "INOUT ",
1059 ParameterMode::Variadic => "",
1060 };
1061 format!("{}{} {}", mode, p.name, p.data_type)
1062 })
1063 .collect();
1064 sql.push_str(¶ms.join(", "));
1065 sql.push_str(")\n");
1066
1067 if proc.is_function {
1069 if let Some(ref ret) = proc.return_type {
1070 sql.push_str(&format!("RETURNS {}\n", ret));
1071 }
1072 }
1073
1074 if proc.volatility == Volatility::Immutable {
1076 sql.push_str("DETERMINISTIC\n");
1077 } else {
1078 sql.push_str("NOT DETERMINISTIC\n");
1079 }
1080
1081 if proc.security_definer {
1082 sql.push_str("SQL SECURITY DEFINER\n");
1083 }
1084
1085 sql.push_str(&format!("BEGIN\n{}\nEND;", proc.body));
1087
1088 sql
1089 }
1090
1091 fn create_mysql_trigger(&self, trigger: &TriggerDefinition) -> String {
1092 let mut sql = String::new();
1093
1094 sql.push_str(&format!("CREATE TRIGGER {}\n", trigger.name));
1095
1096 sql.push_str(&format!("{} ", trigger.timing.to_sql()));
1098
1099 if let Some(event) = trigger.events.first() {
1101 sql.push_str(&format!("{}\n", event.to_sql()));
1102 }
1103
1104 sql.push_str(&format!("ON {}\n", trigger.table));
1106
1107 sql.push_str(&format!("{}\n", trigger.level.to_sql()));
1109
1110 sql.push_str(&format!("BEGIN\n CALL {}();\nEND;", trigger.function));
1112
1113 sql
1114 }
1115
1116 fn create_sqlite_udf(&self, proc: &ProcedureDefinition) -> String {
1118 format!(
1120 "-- SQLite UDF: {} must be registered via rusqlite::create_scalar_function\n\
1121 -- Parameters: {}\n\
1122 -- Body:\n-- {}",
1123 proc.name,
1124 proc.parameters
1125 .iter()
1126 .map(|p| format!("{}: {}", p.name, p.data_type))
1127 .collect::<Vec<_>>()
1128 .join(", "),
1129 proc.body.replace('\n', "\n-- ")
1130 )
1131 }
1132
1133 fn create_sqlite_trigger(&self, trigger: &TriggerDefinition) -> String {
1134 let mut sql = String::new();
1135
1136 sql.push_str(&format!("CREATE TRIGGER IF NOT EXISTS {}\n", trigger.name));
1137
1138 sql.push_str(&format!("{} ", trigger.timing.to_sql()));
1140
1141 let events: Vec<&str> = trigger.events.iter().map(|e| e.to_sql()).collect();
1143 sql.push_str(&events.join(" OR "));
1144
1145 sql.push_str(&format!("\nON {}\n", trigger.table));
1147
1148 sql.push_str("FOR EACH ROW\n");
1150
1151 if let Some(ref cond) = trigger.condition {
1153 sql.push_str(&format!("WHEN {}\n", cond));
1154 }
1155
1156 sql.push_str(&format!("BEGIN\n SELECT {}();\nEND;", trigger.function));
1158
1159 sql
1160 }
1161
1162 fn create_mssql_procedure(&self, proc: &ProcedureDefinition) -> String {
1164 let mut sql = String::new();
1165
1166 let obj_type = if proc.is_function { "FUNCTION" } else { "PROCEDURE" };
1167
1168 sql.push_str(&format!("CREATE {} {} (", obj_type, proc.qualified_name()));
1169
1170 let params: Vec<String> = proc
1172 .parameters
1173 .iter()
1174 .map(|p| {
1175 let output = if p.mode == ParameterMode::Out || p.mode == ParameterMode::InOut {
1176 " OUTPUT"
1177 } else {
1178 ""
1179 };
1180 format!("@{} {}{}", p.name, p.data_type, output)
1181 })
1182 .collect();
1183 sql.push_str(¶ms.join(", "));
1184 sql.push_str(")\n");
1185
1186 if proc.is_function {
1188 if let Some(ref ret) = proc.return_type {
1189 sql.push_str(&format!("RETURNS {}\n", ret));
1190 }
1191 }
1192
1193 sql.push_str("AS\nBEGIN\n");
1194 sql.push_str(&proc.body);
1195 sql.push_str("\nEND;");
1196
1197 sql
1198 }
1199
1200 fn alter_mssql_procedure(&self, proc: &ProcedureDefinition) -> String {
1201 self.create_mssql_procedure(proc).replacen("CREATE", "ALTER", 1)
1203 }
1204
1205 fn create_mssql_trigger(&self, trigger: &TriggerDefinition) -> String {
1206 let mut sql = String::new();
1207
1208 sql.push_str(&format!(
1209 "CREATE TRIGGER {}\nON {}\n",
1210 trigger.qualified_name(),
1211 trigger.table
1212 ));
1213
1214 sql.push_str(&format!("{} ", trigger.timing.to_sql()));
1216
1217 let events: Vec<&str> = trigger.events.iter().map(|e| e.to_sql()).collect();
1219 sql.push_str(&events.join(", "));
1220
1221 sql.push_str("\nAS\nBEGIN\n");
1222 sql.push_str(&format!(" EXEC {};\n", trigger.function));
1223 sql.push_str("END;");
1224
1225 sql
1226 }
1227
1228 pub fn generate_migration(&self, diff: &ProcedureDiff) -> MigrationSql {
1230 let mut up = Vec::new();
1231 let mut down = Vec::new();
1232
1233 for proc in &diff.create {
1235 up.push(self.create_procedure(proc));
1236 down.push(self.drop_procedure(proc));
1237 }
1238
1239 for name in &diff.drop {
1241 up.push(format!("DROP FUNCTION IF EXISTS {};", name));
1243 down.push(format!("-- Recreate {} (original definition needed)", name));
1245 }
1246
1247 for alter in &diff.alter {
1249 up.push(self.alter_procedure(alter));
1250 down.push(self.create_procedure(&alter.old));
1252 }
1253
1254 for trigger in &diff.create_triggers {
1256 up.push(self.create_trigger(trigger));
1257 down.push(self.drop_trigger(trigger));
1258 }
1259
1260 for name in &diff.drop_triggers {
1262 up.push(format!("DROP TRIGGER IF EXISTS {};", name));
1263 down.push(format!("-- Recreate trigger {} (original definition needed)", name));
1264 }
1265
1266 for alter in &diff.alter_triggers {
1268 up.push(self.drop_trigger(&alter.old));
1269 up.push(self.create_trigger(&alter.new));
1270 down.push(self.drop_trigger(&alter.new));
1271 down.push(self.create_trigger(&alter.old));
1272 }
1273
1274 MigrationSql {
1275 up: up.join("\n\n"),
1276 down: down.join("\n\n"),
1277 }
1278 }
1279}
1280
1281#[derive(Debug, Clone)]
1283pub struct MigrationSql {
1284 pub up: String,
1286 pub down: String,
1288}
1289
1290#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1296pub struct ProcedureStore {
1297 pub procedures: HashMap<String, ProcedureDefinition>,
1299 pub triggers: HashMap<String, TriggerDefinition>,
1301 pub events: HashMap<String, ScheduledEvent>,
1303 pub history: Vec<ProcedureHistoryEntry>,
1305}
1306
1307#[derive(Debug, Clone, Serialize, Deserialize)]
1309pub struct ProcedureHistoryEntry {
1310 pub timestamp: String,
1312 pub migration_id: String,
1314 pub change_type: ChangeType,
1316 pub name: String,
1318 pub old_checksum: Option<String>,
1320 pub new_checksum: Option<String>,
1322}
1323
1324#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1326pub enum ChangeType {
1327 Create,
1328 Alter,
1329 Drop,
1330}
1331
1332impl ProcedureStore {
1333 pub fn new() -> Self {
1335 Self::default()
1336 }
1337
1338 pub fn add_procedure(&mut self, proc: ProcedureDefinition) {
1340 self.procedures.insert(proc.qualified_name(), proc);
1341 }
1342
1343 pub fn add_trigger(&mut self, trigger: TriggerDefinition) {
1345 self.triggers.insert(trigger.qualified_name(), trigger);
1346 }
1347
1348 pub fn procedures_list(&self) -> Vec<&ProcedureDefinition> {
1350 self.procedures.values().collect()
1351 }
1352
1353 pub fn triggers_list(&self) -> Vec<&TriggerDefinition> {
1355 self.triggers.values().collect()
1356 }
1357
1358 pub fn save(&self, path: &std::path::Path) -> std::io::Result<()> {
1360 let content = toml::to_string_pretty(self)
1361 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
1362 std::fs::write(path, content)
1363 }
1364
1365 pub fn load(path: &std::path::Path) -> std::io::Result<Self> {
1367 let content = std::fs::read_to_string(path)?;
1368 toml::from_str(&content)
1369 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
1370 }
1371
1372 pub fn add_event(&mut self, event: ScheduledEvent) {
1374 self.events.insert(event.name.clone(), event);
1375 }
1376
1377 pub fn events_list(&self) -> Vec<&ScheduledEvent> {
1379 self.events.values().collect()
1380 }
1381}
1382
1383#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1389pub struct ScheduledEvent {
1390 pub name: String,
1392 pub schema: Option<String>,
1394 pub schedule: EventSchedule,
1396 pub body: String,
1398 pub enabled: bool,
1400 pub on_completion: OnCompletion,
1402 pub comment: Option<String>,
1404 pub starts: Option<String>,
1406 pub ends: Option<String>,
1408 pub definer: Option<String>,
1410}
1411
1412impl Default for ScheduledEvent {
1413 fn default() -> Self {
1414 Self {
1415 name: String::new(),
1416 schema: None,
1417 schedule: EventSchedule::Once,
1418 body: String::new(),
1419 enabled: true,
1420 on_completion: OnCompletion::Drop,
1421 comment: None,
1422 starts: None,
1423 ends: None,
1424 definer: None,
1425 }
1426 }
1427}
1428
1429impl ScheduledEvent {
1430 pub fn new(name: impl Into<String>) -> Self {
1432 Self {
1433 name: name.into(),
1434 ..Default::default()
1435 }
1436 }
1437
1438 pub fn schema(mut self, schema: impl Into<String>) -> Self {
1440 self.schema = Some(schema.into());
1441 self
1442 }
1443
1444 pub fn at(mut self, datetime: impl Into<String>) -> Self {
1446 self.schedule = EventSchedule::At(datetime.into());
1447 self
1448 }
1449
1450 pub fn every(mut self, interval: EventInterval) -> Self {
1452 self.schedule = EventSchedule::Every(interval);
1453 self
1454 }
1455
1456 pub fn body(mut self, body: impl Into<String>) -> Self {
1458 self.body = body.into();
1459 self
1460 }
1461
1462 pub fn disabled(mut self) -> Self {
1464 self.enabled = false;
1465 self
1466 }
1467
1468 pub fn preserve(mut self) -> Self {
1470 self.on_completion = OnCompletion::Preserve;
1471 self
1472 }
1473
1474 pub fn starts(mut self, datetime: impl Into<String>) -> Self {
1476 self.starts = Some(datetime.into());
1477 self
1478 }
1479
1480 pub fn ends(mut self, datetime: impl Into<String>) -> Self {
1482 self.ends = Some(datetime.into());
1483 self
1484 }
1485
1486 pub fn comment(mut self, comment: impl Into<String>) -> Self {
1488 self.comment = Some(comment.into());
1489 self
1490 }
1491
1492 pub fn qualified_name(&self) -> String {
1494 match &self.schema {
1495 Some(schema) => format!("{}.{}", schema, self.name),
1496 None => self.name.clone(),
1497 }
1498 }
1499}
1500
1501#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1503pub enum EventSchedule {
1504 Once,
1506 At(String),
1508 Every(EventInterval),
1510 Cron(String),
1512}
1513
1514#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1516pub struct EventInterval {
1517 pub quantity: u32,
1519 pub unit: IntervalUnit,
1521}
1522
1523impl EventInterval {
1524 pub fn new(quantity: u32, unit: IntervalUnit) -> Self {
1526 Self { quantity, unit }
1527 }
1528
1529 pub fn seconds(n: u32) -> Self {
1531 Self::new(n, IntervalUnit::Second)
1532 }
1533
1534 pub fn minutes(n: u32) -> Self {
1536 Self::new(n, IntervalUnit::Minute)
1537 }
1538
1539 pub fn hours(n: u32) -> Self {
1541 Self::new(n, IntervalUnit::Hour)
1542 }
1543
1544 pub fn days(n: u32) -> Self {
1546 Self::new(n, IntervalUnit::Day)
1547 }
1548
1549 pub fn weeks(n: u32) -> Self {
1551 Self::new(n, IntervalUnit::Week)
1552 }
1553
1554 pub fn months(n: u32) -> Self {
1556 Self::new(n, IntervalUnit::Month)
1557 }
1558
1559 pub fn to_mysql(&self) -> String {
1561 format!("{} {}", self.quantity, self.unit.to_mysql())
1562 }
1563}
1564
1565#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1567pub enum IntervalUnit {
1568 Second,
1569 Minute,
1570 Hour,
1571 Day,
1572 Week,
1573 Month,
1574 Quarter,
1575 Year,
1576}
1577
1578impl IntervalUnit {
1579 pub fn to_mysql(&self) -> &'static str {
1581 match self {
1582 Self::Second => "SECOND",
1583 Self::Minute => "MINUTE",
1584 Self::Hour => "HOUR",
1585 Self::Day => "DAY",
1586 Self::Week => "WEEK",
1587 Self::Month => "MONTH",
1588 Self::Quarter => "QUARTER",
1589 Self::Year => "YEAR",
1590 }
1591 }
1592}
1593
1594#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1596pub enum OnCompletion {
1597 #[default]
1599 Drop,
1600 Preserve,
1602}
1603
1604#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1610pub struct SqlAgentJob {
1611 pub name: String,
1613 pub description: Option<String>,
1615 pub category: Option<String>,
1617 pub owner: Option<String>,
1619 pub enabled: bool,
1621 pub steps: Vec<JobStep>,
1623 pub schedules: Vec<JobSchedule>,
1625 pub notify_level: NotifyLevel,
1627}
1628
1629impl Default for SqlAgentJob {
1630 fn default() -> Self {
1631 Self {
1632 name: String::new(),
1633 description: None,
1634 category: None,
1635 owner: None,
1636 enabled: true,
1637 steps: Vec::new(),
1638 schedules: Vec::new(),
1639 notify_level: NotifyLevel::OnFailure,
1640 }
1641 }
1642}
1643
1644impl SqlAgentJob {
1645 pub fn new(name: impl Into<String>) -> Self {
1647 Self {
1648 name: name.into(),
1649 ..Default::default()
1650 }
1651 }
1652
1653 pub fn description(mut self, desc: impl Into<String>) -> Self {
1655 self.description = Some(desc.into());
1656 self
1657 }
1658
1659 pub fn step(mut self, step: JobStep) -> Self {
1661 self.steps.push(step);
1662 self
1663 }
1664
1665 pub fn tsql_step(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
1667 self.steps.push(JobStep {
1668 name: name.into(),
1669 step_type: StepType::TSql,
1670 command: sql.into(),
1671 database: None,
1672 on_success: StepAction::GoToNextStep,
1673 on_failure: StepAction::QuitWithFailure,
1674 retry_attempts: 0,
1675 retry_interval: 0,
1676 });
1677 self
1678 }
1679
1680 pub fn schedule(mut self, schedule: JobSchedule) -> Self {
1682 self.schedules.push(schedule);
1683 self
1684 }
1685
1686 pub fn disabled(mut self) -> Self {
1688 self.enabled = false;
1689 self
1690 }
1691}
1692
1693#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1695pub struct JobStep {
1696 pub name: String,
1698 pub step_type: StepType,
1700 pub command: String,
1702 pub database: Option<String>,
1704 pub on_success: StepAction,
1706 pub on_failure: StepAction,
1708 pub retry_attempts: u32,
1710 pub retry_interval: u32,
1712}
1713
1714#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1716pub enum StepType {
1717 #[default]
1718 TSql,
1719 CmdExec,
1720 PowerShell,
1721 Ssis,
1722 Ssas,
1723 Ssrs,
1724}
1725
1726impl StepType {
1727 pub fn subsystem(&self) -> &'static str {
1729 match self {
1730 Self::TSql => "TSQL",
1731 Self::CmdExec => "CmdExec",
1732 Self::PowerShell => "PowerShell",
1733 Self::Ssis => "SSIS",
1734 Self::Ssas => "AnalysisCommand",
1735 Self::Ssrs => "Reporting Services Command",
1736 }
1737 }
1738}
1739
1740#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1742pub enum StepAction {
1743 #[default]
1744 GoToNextStep,
1745 GoToStep(u32),
1746 QuitWithSuccess,
1747 QuitWithFailure,
1748}
1749
1750impl StepAction {
1751 pub fn action_id(&self) -> u32 {
1753 match self {
1754 Self::GoToNextStep => 3,
1755 Self::GoToStep(_) => 4,
1756 Self::QuitWithSuccess => 1,
1757 Self::QuitWithFailure => 2,
1758 }
1759 }
1760}
1761
1762#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1764pub struct JobSchedule {
1765 pub name: String,
1767 pub frequency: ScheduleFrequency,
1769 pub active_start_time: Option<String>,
1771 pub start_date: Option<String>,
1773 pub end_date: Option<String>,
1775 pub enabled: bool,
1777}
1778
1779impl Default for JobSchedule {
1780 fn default() -> Self {
1781 Self {
1782 name: String::new(),
1783 frequency: ScheduleFrequency::Daily { every_n_days: 1 },
1784 active_start_time: None,
1785 start_date: None,
1786 end_date: None,
1787 enabled: true,
1788 }
1789 }
1790}
1791
1792impl JobSchedule {
1793 pub fn new(name: impl Into<String>) -> Self {
1795 Self {
1796 name: name.into(),
1797 ..Default::default()
1798 }
1799 }
1800
1801 pub fn once(mut self) -> Self {
1803 self.frequency = ScheduleFrequency::Once;
1804 self
1805 }
1806
1807 pub fn daily(mut self, every_n_days: u32) -> Self {
1809 self.frequency = ScheduleFrequency::Daily { every_n_days };
1810 self
1811 }
1812
1813 pub fn weekly(mut self, days: Vec<Weekday>) -> Self {
1815 self.frequency = ScheduleFrequency::Weekly {
1816 every_n_weeks: 1,
1817 days,
1818 };
1819 self
1820 }
1821
1822 pub fn monthly(mut self, day_of_month: u32) -> Self {
1824 self.frequency = ScheduleFrequency::Monthly {
1825 every_n_months: 1,
1826 day_of_month,
1827 };
1828 self
1829 }
1830
1831 pub fn at(mut self, time: impl Into<String>) -> Self {
1833 self.active_start_time = Some(time.into());
1834 self
1835 }
1836}
1837
1838#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1840pub enum ScheduleFrequency {
1841 Once,
1842 Daily { every_n_days: u32 },
1843 Weekly { every_n_weeks: u32, days: Vec<Weekday> },
1844 Monthly { every_n_months: u32, day_of_month: u32 },
1845 OnIdle,
1846 OnAgentStart,
1847}
1848
1849impl ScheduleFrequency {
1850 pub fn freq_type(&self) -> u32 {
1852 match self {
1853 Self::Once => 1,
1854 Self::Daily { .. } => 4,
1855 Self::Weekly { .. } => 8,
1856 Self::Monthly { .. } => 16,
1857 Self::OnIdle => 128,
1858 Self::OnAgentStart => 64,
1859 }
1860 }
1861
1862 pub fn freq_interval(&self) -> u32 {
1864 match self {
1865 Self::Once => 0,
1866 Self::Daily { every_n_days } => *every_n_days,
1867 Self::Weekly { days, .. } => {
1868 days.iter().map(|d| d.bitmask()).fold(0, |acc, m| acc | m)
1869 }
1870 Self::Monthly { day_of_month, .. } => *day_of_month,
1871 Self::OnIdle | Self::OnAgentStart => 0,
1872 }
1873 }
1874}
1875
1876#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1878pub enum Weekday {
1879 Sunday,
1880 Monday,
1881 Tuesday,
1882 Wednesday,
1883 Thursday,
1884 Friday,
1885 Saturday,
1886}
1887
1888impl Weekday {
1889 pub fn bitmask(&self) -> u32 {
1891 match self {
1892 Self::Sunday => 1,
1893 Self::Monday => 2,
1894 Self::Tuesday => 4,
1895 Self::Wednesday => 8,
1896 Self::Thursday => 16,
1897 Self::Friday => 32,
1898 Self::Saturday => 64,
1899 }
1900 }
1901}
1902
1903#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1905pub enum NotifyLevel {
1906 Never,
1907 OnSuccess,
1908 #[default]
1909 OnFailure,
1910 Always,
1911}
1912
1913#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1919pub struct AtlasTrigger {
1920 pub name: String,
1922 pub trigger_type: AtlasTriggerType,
1924 pub enabled: bool,
1926 pub function_name: String,
1928 pub match_expression: Option<String>,
1930 pub project: Option<String>,
1932 pub full_document: bool,
1934 pub full_document_before_change: bool,
1936}
1937
1938impl Default for AtlasTrigger {
1939 fn default() -> Self {
1940 Self {
1941 name: String::new(),
1942 trigger_type: AtlasTriggerType::Database {
1943 database: String::new(),
1944 collection: String::new(),
1945 operation_types: Vec::new(),
1946 },
1947 enabled: true,
1948 function_name: String::new(),
1949 match_expression: None,
1950 project: None,
1951 full_document: false,
1952 full_document_before_change: false,
1953 }
1954 }
1955}
1956
1957impl AtlasTrigger {
1958 pub fn new(name: impl Into<String>) -> Self {
1960 Self {
1961 name: name.into(),
1962 ..Default::default()
1963 }
1964 }
1965
1966 pub fn database(
1968 mut self,
1969 database: impl Into<String>,
1970 collection: impl Into<String>,
1971 operations: Vec<AtlasOperation>,
1972 ) -> Self {
1973 self.trigger_type = AtlasTriggerType::Database {
1974 database: database.into(),
1975 collection: collection.into(),
1976 operation_types: operations,
1977 };
1978 self
1979 }
1980
1981 pub fn scheduled(mut self, cron: impl Into<String>) -> Self {
1983 self.trigger_type = AtlasTriggerType::Scheduled {
1984 schedule: cron.into(),
1985 };
1986 self
1987 }
1988
1989 pub fn authentication(mut self, operation: AuthOperation) -> Self {
1991 self.trigger_type = AtlasTriggerType::Authentication { operation };
1992 self
1993 }
1994
1995 pub fn function(mut self, name: impl Into<String>) -> Self {
1997 self.function_name = name.into();
1998 self
1999 }
2000
2001 pub fn full_document(mut self) -> Self {
2003 self.full_document = true;
2004 self
2005 }
2006
2007 pub fn disabled(mut self) -> Self {
2009 self.enabled = false;
2010 self
2011 }
2012}
2013
2014#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2016pub enum AtlasTriggerType {
2017 Database {
2019 database: String,
2020 collection: String,
2021 operation_types: Vec<AtlasOperation>,
2022 },
2023 Scheduled { schedule: String },
2025 Authentication { operation: AuthOperation },
2027}
2028
2029#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2031pub enum AtlasOperation {
2032 Insert,
2033 Update,
2034 Replace,
2035 Delete,
2036}
2037
2038impl AtlasOperation {
2039 pub fn as_str(&self) -> &'static str {
2041 match self {
2042 Self::Insert => "INSERT",
2043 Self::Update => "UPDATE",
2044 Self::Replace => "REPLACE",
2045 Self::Delete => "DELETE",
2046 }
2047 }
2048}
2049
2050#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2052pub enum AuthOperation {
2053 Create,
2054 Login,
2055 Delete,
2056}
2057
2058impl ProcedureSqlGenerator {
2063 pub fn create_event(&self, event: &ScheduledEvent) -> String {
2065 match self.db_type {
2066 DatabaseType::MySQL => self.create_mysql_event(event),
2067 DatabaseType::MSSQL => {
2068 format!(
2070 "-- Use SqlAgentJob for MSSQL scheduled tasks\n\
2071 -- Event: {}\n\
2072 -- Schedule: {:?}",
2073 event.name, event.schedule
2074 )
2075 }
2076 _ => format!("-- Events not supported for {:?}", self.db_type),
2077 }
2078 }
2079
2080 pub fn drop_event(&self, event: &ScheduledEvent) -> String {
2082 match self.db_type {
2083 DatabaseType::MySQL => {
2084 format!("DROP EVENT IF EXISTS {};", event.qualified_name())
2085 }
2086 _ => format!("-- DROP EVENT not supported for {:?}", self.db_type),
2087 }
2088 }
2089
2090 fn create_mysql_event(&self, event: &ScheduledEvent) -> String {
2091 let mut sql = String::new();
2092
2093 sql.push_str(&format!("CREATE EVENT IF NOT EXISTS {}\n", event.qualified_name()));
2094
2095 match &event.schedule {
2097 EventSchedule::Once => sql.push_str("ON SCHEDULE AT CURRENT_TIMESTAMP\n"),
2098 EventSchedule::At(datetime) => sql.push_str(&format!("ON SCHEDULE AT '{}'\n", datetime)),
2099 EventSchedule::Every(interval) => {
2100 sql.push_str(&format!("ON SCHEDULE EVERY {}\n", interval.to_mysql()));
2101 if let Some(ref starts) = event.starts {
2102 sql.push_str(&format!("STARTS '{}'\n", starts));
2103 }
2104 if let Some(ref ends) = event.ends {
2105 sql.push_str(&format!("ENDS '{}'\n", ends));
2106 }
2107 }
2108 EventSchedule::Cron(_) => {
2109 sql.push_str("ON SCHEDULE EVERY 1 DAY\n");
2111 }
2112 }
2113
2114 match event.on_completion {
2116 OnCompletion::Drop => sql.push_str("ON COMPLETION NOT PRESERVE\n"),
2117 OnCompletion::Preserve => sql.push_str("ON COMPLETION PRESERVE\n"),
2118 }
2119
2120 if event.enabled {
2122 sql.push_str("ENABLE\n");
2123 } else {
2124 sql.push_str("DISABLE\n");
2125 }
2126
2127 if let Some(ref comment) = event.comment {
2129 sql.push_str(&format!("COMMENT '{}'\n", comment.replace('\'', "''")));
2130 }
2131
2132 sql.push_str(&format!("DO\n{};", event.body));
2134
2135 sql
2136 }
2137
2138 pub fn create_sql_agent_job(&self, job: &SqlAgentJob) -> String {
2140 let mut sql = String::new();
2141
2142 sql.push_str("-- Create SQL Agent Job\n");
2144 sql.push_str("EXEC msdb.dbo.sp_add_job\n");
2145 sql.push_str(&format!(" @job_name = N'{}',\n", job.name));
2146
2147 if let Some(ref desc) = job.description {
2148 sql.push_str(&format!(" @description = N'{}',\n", desc));
2149 }
2150
2151 sql.push_str(&format!(
2152 " @enabled = {};\n\n",
2153 if job.enabled { 1 } else { 0 }
2154 ));
2155
2156 for (i, step) in job.steps.iter().enumerate() {
2158 sql.push_str(&format!("-- Step {}: {}\n", i + 1, step.name));
2159 sql.push_str("EXEC msdb.dbo.sp_add_jobstep\n");
2160 sql.push_str(&format!(" @job_name = N'{}',\n", job.name));
2161 sql.push_str(&format!(" @step_name = N'{}',\n", step.name));
2162 sql.push_str(&format!(" @subsystem = N'{}',\n", step.step_type.subsystem()));
2163 sql.push_str(&format!(
2164 " @command = N'{}',\n",
2165 step.command.replace('\'', "''")
2166 ));
2167
2168 if let Some(ref db) = step.database {
2169 sql.push_str(&format!(" @database_name = N'{}',\n", db));
2170 }
2171
2172 sql.push_str(&format!(
2173 " @on_success_action = {},\n",
2174 step.on_success.action_id()
2175 ));
2176 sql.push_str(&format!(
2177 " @on_fail_action = {},\n",
2178 step.on_failure.action_id()
2179 ));
2180 sql.push_str(&format!(" @retry_attempts = {},\n", step.retry_attempts));
2181 sql.push_str(&format!(" @retry_interval = {};\n\n", step.retry_interval));
2182 }
2183
2184 for schedule in &job.schedules {
2186 sql.push_str(&format!("-- Schedule: {}\n", schedule.name));
2187 sql.push_str("EXEC msdb.dbo.sp_add_schedule\n");
2188 sql.push_str(&format!(" @schedule_name = N'{}',\n", schedule.name));
2189 sql.push_str(&format!(
2190 " @enabled = {},\n",
2191 if schedule.enabled { 1 } else { 0 }
2192 ));
2193 sql.push_str(&format!(
2194 " @freq_type = {},\n",
2195 schedule.frequency.freq_type()
2196 ));
2197 sql.push_str(&format!(
2198 " @freq_interval = {};\n",
2199 schedule.frequency.freq_interval()
2200 ));
2201
2202 sql.push_str("\nEXEC msdb.dbo.sp_attach_schedule\n");
2204 sql.push_str(&format!(" @job_name = N'{}',\n", job.name));
2205 sql.push_str(&format!(" @schedule_name = N'{}';\n\n", schedule.name));
2206 }
2207
2208 sql.push_str("EXEC msdb.dbo.sp_add_jobserver\n");
2210 sql.push_str(&format!(" @job_name = N'{}',\n", job.name));
2211 sql.push_str(" @server_name = N'(LOCAL)';\n");
2212
2213 sql
2214 }
2215
2216 pub fn drop_sql_agent_job(&self, job_name: &str) -> String {
2218 format!(
2219 "IF EXISTS (SELECT 1 FROM msdb.dbo.sysjobs WHERE name = N'{}')\n\
2220 BEGIN\n\
2221 EXEC msdb.dbo.sp_delete_job @job_name = N'{}';\n\
2222 END;",
2223 job_name, job_name
2224 )
2225 }
2226}
2227
2228#[derive(Debug, Clone, Default)]
2234pub struct EventDiff {
2235 pub create: Vec<ScheduledEvent>,
2237 pub drop: Vec<String>,
2239 pub alter: Vec<EventAlterDiff>,
2241 pub create_jobs: Vec<SqlAgentJob>,
2243 pub drop_jobs: Vec<String>,
2245}
2246
2247#[derive(Debug, Clone)]
2249pub struct EventAlterDiff {
2250 pub old: ScheduledEvent,
2252 pub new: ScheduledEvent,
2254}
2255
2256impl EventDiff {
2257 pub fn is_empty(&self) -> bool {
2259 self.create.is_empty()
2260 && self.drop.is_empty()
2261 && self.alter.is_empty()
2262 && self.create_jobs.is_empty()
2263 && self.drop_jobs.is_empty()
2264 }
2265}
2266
2267#[cfg(test)]
2268mod tests {
2269 use super::*;
2270
2271 #[test]
2272 fn test_procedure_definition() {
2273 let proc = ProcedureDefinition::function("calculate_tax")
2274 .schema("public")
2275 .param("amount", "DECIMAL(10,2)")
2276 .param("rate", "DECIMAL(5,4)")
2277 .returns("DECIMAL(10,2)")
2278 .language(ProcedureLanguage::PlPgSql)
2279 .immutable()
2280 .body("RETURN amount * rate;");
2281
2282 assert_eq!(proc.name, "calculate_tax");
2283 assert_eq!(proc.qualified_name(), "public.calculate_tax");
2284 assert_eq!(proc.parameters.len(), 2);
2285 assert!(proc.is_function);
2286 }
2287
2288 #[test]
2289 fn test_trigger_definition() {
2290 let trigger = TriggerDefinition::new("audit_users", "users")
2291 .after()
2292 .on(vec![TriggerEvent::Insert, TriggerEvent::Update])
2293 .for_each_row()
2294 .execute("audit_trigger_fn");
2295
2296 assert_eq!(trigger.name, "audit_users");
2297 assert_eq!(trigger.table, "users");
2298 assert_eq!(trigger.timing, TriggerTiming::After);
2299 assert_eq!(trigger.events.len(), 2);
2300 }
2301
2302 #[test]
2303 fn test_procedure_diff() {
2304 let old = vec![
2305 ProcedureDefinition::function("fn1").body("v1"),
2306 ProcedureDefinition::function("fn2").body("v2"),
2307 ];
2308 let new = vec![
2309 ProcedureDefinition::function("fn1").body("v1_updated"),
2310 ProcedureDefinition::function("fn3").body("v3"),
2311 ];
2312
2313 let diff = ProcedureDiffer::diff(&old, &new);
2314
2315 assert_eq!(diff.create.len(), 1); assert_eq!(diff.drop.len(), 1); assert_eq!(diff.alter.len(), 1); }
2319
2320 #[test]
2321 fn test_postgres_sql_generation() {
2322 let generator = ProcedureSqlGenerator::new(DatabaseType::PostgreSQL);
2323
2324 let proc = ProcedureDefinition::function("greet")
2325 .param("name", "TEXT")
2326 .returns("TEXT")
2327 .language(ProcedureLanguage::Sql)
2328 .immutable()
2329 .body("SELECT 'Hello, ' || name || '!';");
2330
2331 let sql = generator.create_procedure(&proc);
2332 assert!(sql.contains("CREATE OR REPLACE"));
2333 assert!(sql.contains("FUNCTION"));
2334 assert!(sql.contains("RETURNS TEXT"));
2335 assert!(sql.contains("IMMUTABLE"));
2336 }
2337
2338 #[test]
2339 fn test_trigger_sql_generation() {
2340 let generator = ProcedureSqlGenerator::new(DatabaseType::PostgreSQL);
2341
2342 let trigger = TriggerDefinition::new("update_timestamp", "users")
2343 .before()
2344 .on(vec![TriggerEvent::Update])
2345 .for_each_row()
2346 .execute("set_updated_at");
2347
2348 let sql = generator.create_trigger(&trigger);
2349 assert!(sql.contains("CREATE OR REPLACE TRIGGER"));
2350 assert!(sql.contains("BEFORE UPDATE"));
2351 assert!(sql.contains("FOR EACH ROW"));
2352 }
2353
2354 #[test]
2355 fn test_procedure_store() {
2356 let mut store = ProcedureStore::new();
2357
2358 store.add_procedure(ProcedureDefinition::function("fn1").body("test"));
2359 store.add_trigger(TriggerDefinition::new("tr1", "table1"));
2360
2361 assert_eq!(store.procedures.len(), 1);
2362 assert_eq!(store.triggers.len(), 1);
2363 }
2364
2365 #[test]
2366 fn test_scheduled_event() {
2367 let event = ScheduledEvent::new("cleanup_old_data")
2368 .every(EventInterval::days(1))
2369 .body("DELETE FROM logs WHERE created_at < NOW() - INTERVAL 30 DAY")
2370 .preserve()
2371 .comment("Daily cleanup of old log entries");
2372
2373 assert_eq!(event.name, "cleanup_old_data");
2374 assert!(matches!(event.schedule, EventSchedule::Every(_)));
2375 assert_eq!(event.on_completion, OnCompletion::Preserve);
2376 }
2377
2378 #[test]
2379 fn test_mysql_event_sql() {
2380 let generator = ProcedureSqlGenerator::new(DatabaseType::MySQL);
2381
2382 let event = ScheduledEvent::new("hourly_stats")
2383 .every(EventInterval::hours(1))
2384 .body("CALL update_statistics()");
2385
2386 let sql = generator.create_event(&event);
2387 assert!(sql.contains("CREATE EVENT IF NOT EXISTS"));
2388 assert!(sql.contains("ON SCHEDULE EVERY 1 HOUR"));
2389 }
2390
2391 #[test]
2392 fn test_sql_agent_job() {
2393 let job = SqlAgentJob::new("nightly_backup")
2394 .description("Nightly database backup")
2395 .tsql_step("Backup", "BACKUP DATABASE mydb TO DISK = 'C:\\backups\\mydb.bak'")
2396 .schedule(
2397 JobSchedule::new("daily_2am")
2398 .daily(1)
2399 .at("02:00:00")
2400 );
2401
2402 assert_eq!(job.name, "nightly_backup");
2403 assert_eq!(job.steps.len(), 1);
2404 assert_eq!(job.schedules.len(), 1);
2405 }
2406
2407 #[test]
2408 fn test_sql_agent_job_sql() {
2409 let generator = ProcedureSqlGenerator::new(DatabaseType::MSSQL);
2410
2411 let job = SqlAgentJob::new("test_job")
2412 .tsql_step("Step1", "SELECT 1");
2413
2414 let sql = generator.create_sql_agent_job(&job);
2415 assert!(sql.contains("sp_add_job"));
2416 assert!(sql.contains("sp_add_jobstep"));
2417 }
2418
2419 #[test]
2420 fn test_atlas_trigger() {
2421 let trigger = AtlasTrigger::new("on_user_create")
2422 .database("mydb", "users", vec![AtlasOperation::Insert])
2423 .function("handleUserCreate")
2424 .full_document();
2425
2426 assert_eq!(trigger.name, "on_user_create");
2427 assert!(trigger.full_document);
2428 assert!(matches!(trigger.trigger_type, AtlasTriggerType::Database { .. }));
2429 }
2430}
2431