1use std::collections::HashMap;
30
31use serde::{Deserialize, Serialize};
32
33#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
39pub struct ProcedureDefinition {
40 pub name: String,
42 pub schema: Option<String>,
44 pub is_function: bool,
46 pub parameters: Vec<ProcedureParameter>,
48 pub return_type: Option<String>,
50 pub returns_set: bool,
52 pub return_columns: Vec<ReturnColumn>,
54 pub language: ProcedureLanguage,
56 pub body: String,
58 pub volatility: Volatility,
60 pub security_definer: bool,
62 pub cost: Option<i32>,
64 pub rows: Option<i32>,
66 pub parallel: ParallelSafety,
68 pub or_replace: bool,
70 pub comment: Option<String>,
72 pub checksum: Option<String>,
74 pub version: Option<i32>,
76}
77
78impl Default for ProcedureDefinition {
79 fn default() -> Self {
80 Self {
81 name: String::new(),
82 schema: None,
83 is_function: true,
84 parameters: Vec::new(),
85 return_type: None,
86 returns_set: false,
87 return_columns: Vec::new(),
88 language: ProcedureLanguage::Sql,
89 body: String::new(),
90 volatility: Volatility::Volatile,
91 security_definer: false,
92 cost: None,
93 rows: None,
94 parallel: ParallelSafety::Unsafe,
95 or_replace: true,
96 comment: None,
97 checksum: None,
98 version: None,
99 }
100 }
101}
102
103impl ProcedureDefinition {
104 pub fn new(name: impl Into<String>) -> Self {
106 Self {
107 name: name.into(),
108 ..Default::default()
109 }
110 }
111
112 pub fn function(name: impl Into<String>) -> Self {
114 Self {
115 name: name.into(),
116 is_function: true,
117 ..Default::default()
118 }
119 }
120
121 pub fn procedure(name: impl Into<String>) -> Self {
123 Self {
124 name: name.into(),
125 is_function: false,
126 ..Default::default()
127 }
128 }
129
130 pub fn schema(mut self, schema: impl Into<String>) -> Self {
132 self.schema = Some(schema.into());
133 self
134 }
135
136 pub fn param(mut self, name: impl Into<String>, data_type: impl Into<String>) -> Self {
138 self.parameters.push(ProcedureParameter {
139 name: name.into(),
140 data_type: data_type.into(),
141 mode: ParameterMode::In,
142 default: None,
143 });
144 self
145 }
146
147 pub fn out_param(mut self, name: impl Into<String>, data_type: impl Into<String>) -> Self {
149 self.parameters.push(ProcedureParameter {
150 name: name.into(),
151 data_type: data_type.into(),
152 mode: ParameterMode::Out,
153 default: None,
154 });
155 self
156 }
157
158 pub fn inout_param(mut self, name: impl Into<String>, data_type: impl Into<String>) -> Self {
160 self.parameters.push(ProcedureParameter {
161 name: name.into(),
162 data_type: data_type.into(),
163 mode: ParameterMode::InOut,
164 default: None,
165 });
166 self
167 }
168
169 pub fn returns(mut self, return_type: impl Into<String>) -> Self {
171 self.return_type = Some(return_type.into());
172 self
173 }
174
175 pub fn returns_setof(mut self, return_type: impl Into<String>) -> Self {
177 self.return_type = Some(return_type.into());
178 self.returns_set = true;
179 self
180 }
181
182 pub fn returns_table(mut self, columns: Vec<ReturnColumn>) -> Self {
184 self.returns_set = true;
185 self.return_columns = columns;
186 self
187 }
188
189 pub fn language(mut self, language: ProcedureLanguage) -> Self {
191 self.language = language;
192 self
193 }
194
195 pub fn body(mut self, body: impl Into<String>) -> Self {
197 self.body = body.into();
198 self.update_checksum();
199 self
200 }
201
202 pub fn volatility(mut self, volatility: Volatility) -> Self {
204 self.volatility = volatility;
205 self
206 }
207
208 pub fn immutable(mut self) -> Self {
210 self.volatility = Volatility::Immutable;
211 self
212 }
213
214 pub fn stable(mut self) -> Self {
216 self.volatility = Volatility::Stable;
217 self
218 }
219
220 pub fn security_definer(mut self) -> Self {
222 self.security_definer = true;
223 self
224 }
225
226 pub fn cost(mut self, cost: i32) -> Self {
228 self.cost = Some(cost);
229 self
230 }
231
232 pub fn parallel(mut self, parallel: ParallelSafety) -> Self {
234 self.parallel = parallel;
235 self
236 }
237
238 pub fn comment(mut self, comment: impl Into<String>) -> Self {
240 self.comment = Some(comment.into());
241 self
242 }
243
244 fn update_checksum(&mut self) {
246 use std::collections::hash_map::DefaultHasher;
247 use std::hash::{Hash, Hasher};
248
249 let mut hasher = DefaultHasher::new();
250 self.body.hash(&mut hasher);
251 self.checksum = Some(format!("{:016x}", hasher.finish()));
252 }
253
254 pub fn qualified_name(&self) -> String {
256 match &self.schema {
257 Some(schema) => format!("{}.{}", schema, self.name),
258 None => self.name.clone(),
259 }
260 }
261
262 pub fn has_changed(&self, other: &ProcedureDefinition) -> bool {
264 if let (Some(a), Some(b)) = (&self.checksum, &other.checksum) {
266 if a != b {
267 return true;
268 }
269 }
270
271 self.body != other.body
273 || self.parameters != other.parameters
274 || self.return_type != other.return_type
275 || self.returns_set != other.returns_set
276 || self.language != other.language
277 || self.volatility != other.volatility
278 || self.security_definer != other.security_definer
279 }
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
284pub struct ProcedureParameter {
285 pub name: String,
287 pub data_type: String,
289 pub mode: ParameterMode,
291 pub default: Option<String>,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
297pub enum ParameterMode {
298 #[default]
299 In,
300 Out,
301 InOut,
302 Variadic,
303}
304
305#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
307pub struct ReturnColumn {
308 pub name: String,
310 pub data_type: String,
312}
313
314impl ReturnColumn {
315 pub fn new(name: impl Into<String>, data_type: impl Into<String>) -> Self {
317 Self {
318 name: name.into(),
319 data_type: data_type.into(),
320 }
321 }
322}
323
324#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
326pub enum ProcedureLanguage {
327 #[default]
328 Sql,
329 PlPgSql,
330 PlPython,
331 PlPerl,
332 PlTcl,
333 PlV8,
334 C,
335}
336
337impl ProcedureLanguage {
338 pub fn to_sql(&self) -> &'static str {
340 match self {
341 Self::Sql => "SQL",
342 Self::PlPgSql => "plpgsql",
343 Self::PlPython => "plpython3u",
344 Self::PlPerl => "plperl",
345 Self::PlTcl => "pltcl",
346 Self::PlV8 => "plv8",
347 Self::C => "C",
348 }
349 }
350}
351
352#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
354pub enum Volatility {
355 #[default]
356 Volatile,
357 Stable,
358 Immutable,
359}
360
361impl Volatility {
362 pub fn to_sql(&self) -> &'static str {
364 match self {
365 Self::Volatile => "VOLATILE",
366 Self::Stable => "STABLE",
367 Self::Immutable => "IMMUTABLE",
368 }
369 }
370}
371
372#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
374pub enum ParallelSafety {
375 #[default]
376 Unsafe,
377 Restricted,
378 Safe,
379}
380
381impl ParallelSafety {
382 pub fn to_sql(&self) -> &'static str {
384 match self {
385 Self::Unsafe => "PARALLEL UNSAFE",
386 Self::Restricted => "PARALLEL RESTRICTED",
387 Self::Safe => "PARALLEL SAFE",
388 }
389 }
390}
391
392#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
398pub struct TriggerDefinition {
399 pub name: String,
401 pub schema: Option<String>,
403 pub table: String,
405 pub timing: TriggerTiming,
407 pub events: Vec<TriggerEvent>,
409 pub level: TriggerLevel,
411 pub condition: Option<String>,
413 pub function: String,
415 pub function_args: Vec<String>,
417 pub or_replace: bool,
419 pub comment: Option<String>,
421 pub checksum: Option<String>,
423}
424
425impl Default for TriggerDefinition {
426 fn default() -> Self {
427 Self {
428 name: String::new(),
429 schema: None,
430 table: String::new(),
431 timing: TriggerTiming::Before,
432 events: vec![TriggerEvent::Insert],
433 level: TriggerLevel::Row,
434 condition: None,
435 function: String::new(),
436 function_args: Vec::new(),
437 or_replace: true,
438 comment: None,
439 checksum: None,
440 }
441 }
442}
443
444impl TriggerDefinition {
445 pub fn new(name: impl Into<String>, table: impl Into<String>) -> Self {
447 Self {
448 name: name.into(),
449 table: table.into(),
450 ..Default::default()
451 }
452 }
453
454 pub fn before(mut self) -> Self {
456 self.timing = TriggerTiming::Before;
457 self
458 }
459
460 pub fn after(mut self) -> Self {
462 self.timing = TriggerTiming::After;
463 self
464 }
465
466 pub fn instead_of(mut self) -> Self {
468 self.timing = TriggerTiming::InsteadOf;
469 self
470 }
471
472 pub fn on(mut self, events: Vec<TriggerEvent>) -> Self {
474 self.events = events;
475 self
476 }
477
478 pub fn for_each_row(mut self) -> Self {
480 self.level = TriggerLevel::Row;
481 self
482 }
483
484 pub fn for_each_statement(mut self) -> Self {
486 self.level = TriggerLevel::Statement;
487 self
488 }
489
490 pub fn when(mut self, condition: impl Into<String>) -> Self {
492 self.condition = Some(condition.into());
493 self
494 }
495
496 pub fn execute(mut self, function: impl Into<String>) -> Self {
498 self.function = function.into();
499 self
500 }
501
502 pub fn qualified_name(&self) -> String {
504 match &self.schema {
505 Some(schema) => format!("{}.{}", schema, self.name),
506 None => self.name.clone(),
507 }
508 }
509}
510
511#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
513pub enum TriggerTiming {
514 #[default]
515 Before,
516 After,
517 InsteadOf,
518}
519
520impl TriggerTiming {
521 pub fn to_sql(&self) -> &'static str {
523 match self {
524 Self::Before => "BEFORE",
525 Self::After => "AFTER",
526 Self::InsteadOf => "INSTEAD OF",
527 }
528 }
529}
530
531#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
533pub enum TriggerEvent {
534 Insert,
535 Update,
536 Delete,
537 Truncate,
538}
539
540impl TriggerEvent {
541 pub fn to_sql(&self) -> &'static str {
543 match self {
544 Self::Insert => "INSERT",
545 Self::Update => "UPDATE",
546 Self::Delete => "DELETE",
547 Self::Truncate => "TRUNCATE",
548 }
549 }
550}
551
552#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
554pub enum TriggerLevel {
555 #[default]
556 Row,
557 Statement,
558}
559
560impl TriggerLevel {
561 pub fn to_sql(&self) -> &'static str {
563 match self {
564 Self::Row => "FOR EACH ROW",
565 Self::Statement => "FOR EACH STATEMENT",
566 }
567 }
568}
569
570#[derive(Debug, Clone, Default)]
576pub struct ProcedureDiff {
577 pub create: Vec<ProcedureDefinition>,
579 pub drop: Vec<String>,
581 pub alter: Vec<ProcedureAlterDiff>,
583 pub create_triggers: Vec<TriggerDefinition>,
585 pub drop_triggers: Vec<String>,
587 pub alter_triggers: Vec<TriggerAlterDiff>,
589}
590
591#[derive(Debug, Clone)]
593pub struct ProcedureAlterDiff {
594 pub old: ProcedureDefinition,
596 pub new: ProcedureDefinition,
598 pub changes: Vec<ProcedureChange>,
600}
601
602#[derive(Debug, Clone, PartialEq, Eq)]
604pub enum ProcedureChange {
605 Body,
606 Parameters,
607 ReturnType,
608 Language,
609 Volatility,
610 SecurityDefiner,
611 Cost,
612 Parallel,
613}
614
615#[derive(Debug, Clone)]
617pub struct TriggerAlterDiff {
618 pub old: TriggerDefinition,
620 pub new: TriggerDefinition,
622}
623
624impl ProcedureDiff {
625 pub fn is_empty(&self) -> bool {
627 self.create.is_empty()
628 && self.drop.is_empty()
629 && self.alter.is_empty()
630 && self.create_triggers.is_empty()
631 && self.drop_triggers.is_empty()
632 && self.alter_triggers.is_empty()
633 }
634
635 pub fn summary(&self) -> String {
637 let mut parts = Vec::new();
638
639 if !self.create.is_empty() {
640 parts.push(format!("Create {} procedures", self.create.len()));
641 }
642 if !self.drop.is_empty() {
643 parts.push(format!("Drop {} procedures", self.drop.len()));
644 }
645 if !self.alter.is_empty() {
646 parts.push(format!("Alter {} procedures", self.alter.len()));
647 }
648 if !self.create_triggers.is_empty() {
649 parts.push(format!("Create {} triggers", self.create_triggers.len()));
650 }
651 if !self.drop_triggers.is_empty() {
652 parts.push(format!("Drop {} triggers", self.drop_triggers.len()));
653 }
654 if !self.alter_triggers.is_empty() {
655 parts.push(format!("Alter {} triggers", self.alter_triggers.len()));
656 }
657
658 if parts.is_empty() {
659 "No changes".to_string()
660 } else {
661 parts.join(", ")
662 }
663 }
664}
665
666pub struct ProcedureDiffer;
668
669impl ProcedureDiffer {
670 pub fn diff(from: &[ProcedureDefinition], to: &[ProcedureDefinition]) -> ProcedureDiff {
672 let mut diff = ProcedureDiff::default();
673
674 let from_map: HashMap<_, _> = from.iter().map(|p| (p.qualified_name(), p)).collect();
675 let to_map: HashMap<_, _> = to.iter().map(|p| (p.qualified_name(), p)).collect();
676
677 for (name, proc) in &to_map {
679 if !from_map.contains_key(name) {
680 diff.create.push((*proc).clone());
681 }
682 }
683
684 for (name, _) in &from_map {
686 if !to_map.contains_key(name) {
687 diff.drop.push(name.clone());
688 }
689 }
690
691 for (name, new_proc) in &to_map {
693 if let Some(old_proc) = from_map.get(name) {
694 if old_proc.has_changed(new_proc) {
695 let changes = detect_procedure_changes(old_proc, new_proc);
696 diff.alter.push(ProcedureAlterDiff {
697 old: (*old_proc).clone(),
698 new: (*new_proc).clone(),
699 changes,
700 });
701 }
702 }
703 }
704
705 diff
706 }
707
708 pub fn diff_triggers(from: &[TriggerDefinition], to: &[TriggerDefinition]) -> ProcedureDiff {
710 let mut diff = ProcedureDiff::default();
711
712 let from_map: HashMap<_, _> = from.iter().map(|t| (t.qualified_name(), t)).collect();
713 let to_map: HashMap<_, _> = to.iter().map(|t| (t.qualified_name(), t)).collect();
714
715 for (name, trigger) in &to_map {
717 if !from_map.contains_key(name) {
718 diff.create_triggers.push((*trigger).clone());
719 }
720 }
721
722 for (name, _) in &from_map {
724 if !to_map.contains_key(name) {
725 diff.drop_triggers.push(name.clone());
726 }
727 }
728
729 for (name, new_trigger) in &to_map {
731 if let Some(old_trigger) = from_map.get(name) {
732 if old_trigger != new_trigger {
733 diff.alter_triggers.push(TriggerAlterDiff {
734 old: (*old_trigger).clone(),
735 new: (*new_trigger).clone(),
736 });
737 }
738 }
739 }
740
741 diff
742 }
743}
744
745fn detect_procedure_changes(
746 old: &ProcedureDefinition,
747 new: &ProcedureDefinition,
748) -> Vec<ProcedureChange> {
749 let mut changes = Vec::new();
750
751 if old.body != new.body {
752 changes.push(ProcedureChange::Body);
753 }
754 if old.parameters != new.parameters {
755 changes.push(ProcedureChange::Parameters);
756 }
757 if old.return_type != new.return_type || old.returns_set != new.returns_set {
758 changes.push(ProcedureChange::ReturnType);
759 }
760 if old.language != new.language {
761 changes.push(ProcedureChange::Language);
762 }
763 if old.volatility != new.volatility {
764 changes.push(ProcedureChange::Volatility);
765 }
766 if old.security_definer != new.security_definer {
767 changes.push(ProcedureChange::SecurityDefiner);
768 }
769 if old.cost != new.cost {
770 changes.push(ProcedureChange::Cost);
771 }
772 if old.parallel != new.parallel {
773 changes.push(ProcedureChange::Parallel);
774 }
775
776 changes
777}
778
779pub struct ProcedureSqlGenerator {
785 pub db_type: DatabaseType,
787}
788
789#[derive(Debug, Clone, Copy, PartialEq, Eq)]
791pub enum DatabaseType {
792 PostgreSQL,
793 MySQL,
794 SQLite,
795 MSSQL,
796}
797
798impl ProcedureSqlGenerator {
799 pub fn new(db_type: DatabaseType) -> Self {
801 Self { db_type }
802 }
803
804 pub fn create_procedure(&self, proc: &ProcedureDefinition) -> String {
806 match self.db_type {
807 DatabaseType::PostgreSQL => self.create_postgres_procedure(proc),
808 DatabaseType::MySQL => self.create_mysql_procedure(proc),
809 DatabaseType::SQLite => self.create_sqlite_udf(proc),
810 DatabaseType::MSSQL => self.create_mssql_procedure(proc),
811 }
812 }
813
814 pub fn drop_procedure(&self, proc: &ProcedureDefinition) -> String {
816 let obj_type = if proc.is_function {
817 "FUNCTION"
818 } else {
819 "PROCEDURE"
820 };
821 let name = proc.qualified_name();
822
823 match self.db_type {
824 DatabaseType::PostgreSQL => {
825 let params = proc
827 .parameters
828 .iter()
829 .map(|p| p.data_type.as_str())
830 .collect::<Vec<_>>()
831 .join(", ");
832 format!("DROP {} IF EXISTS {}({});", obj_type, name, params)
833 }
834 DatabaseType::MySQL => {
835 format!("DROP {} IF EXISTS {};", obj_type, name)
836 }
837 DatabaseType::SQLite => {
838 format!("-- SQLite: Remove UDF registration for {}", name)
840 }
841 DatabaseType::MSSQL => {
842 format!(
843 "IF OBJECT_ID('{}', '{}') IS NOT NULL DROP {} {};",
844 name,
845 if proc.is_function { "FN" } else { "P" },
846 obj_type,
847 name
848 )
849 }
850 }
851 }
852
853 pub fn alter_procedure(&self, diff: &ProcedureAlterDiff) -> String {
855 match self.db_type {
857 DatabaseType::PostgreSQL => {
858 self.create_postgres_procedure(&diff.new)
860 }
861 DatabaseType::MySQL => {
862 format!(
864 "{}\n{}",
865 self.drop_procedure(&diff.old),
866 self.create_mysql_procedure(&diff.new)
867 )
868 }
869 DatabaseType::SQLite => self.create_sqlite_udf(&diff.new),
870 DatabaseType::MSSQL => {
871 self.alter_mssql_procedure(&diff.new)
873 }
874 }
875 }
876
877 pub fn create_trigger(&self, trigger: &TriggerDefinition) -> String {
879 match self.db_type {
880 DatabaseType::PostgreSQL => self.create_postgres_trigger(trigger),
881 DatabaseType::MySQL => self.create_mysql_trigger(trigger),
882 DatabaseType::SQLite => self.create_sqlite_trigger(trigger),
883 DatabaseType::MSSQL => self.create_mssql_trigger(trigger),
884 }
885 }
886
887 pub fn drop_trigger(&self, trigger: &TriggerDefinition) -> String {
889 match self.db_type {
890 DatabaseType::PostgreSQL => {
891 format!(
892 "DROP TRIGGER IF EXISTS {} ON {};",
893 trigger.name, trigger.table
894 )
895 }
896 DatabaseType::MySQL => {
897 format!("DROP TRIGGER IF EXISTS {};", trigger.name)
898 }
899 DatabaseType::SQLite => {
900 format!("DROP TRIGGER IF EXISTS {};", trigger.name)
901 }
902 DatabaseType::MSSQL => {
903 format!("DROP TRIGGER IF EXISTS {};", trigger.qualified_name())
904 }
905 }
906 }
907
908 fn create_postgres_procedure(&self, proc: &ProcedureDefinition) -> String {
910 let mut sql = String::new();
911
912 let obj_type = if proc.is_function {
913 "FUNCTION"
914 } else {
915 "PROCEDURE"
916 };
917 let or_replace = if proc.or_replace { "OR REPLACE " } else { "" };
918
919 sql.push_str(&format!(
920 "CREATE {}{} {} (",
921 or_replace,
922 obj_type,
923 proc.qualified_name()
924 ));
925
926 let params: Vec<String> = proc
928 .parameters
929 .iter()
930 .map(|p| {
931 let mode = match p.mode {
932 ParameterMode::In => "",
933 ParameterMode::Out => "OUT ",
934 ParameterMode::InOut => "INOUT ",
935 ParameterMode::Variadic => "VARIADIC ",
936 };
937 let default = p
938 .default
939 .as_ref()
940 .map(|d| format!(" DEFAULT {}", d))
941 .unwrap_or_default();
942 format!("{}{} {}{}", mode, p.name, p.data_type, default)
943 })
944 .collect();
945 sql.push_str(¶ms.join(", "));
946 sql.push_str(")\n");
947
948 if let Some(ref ret) = proc.return_type {
950 if proc.returns_set {
951 sql.push_str(&format!("RETURNS SETOF {}\n", ret));
952 } else {
953 sql.push_str(&format!("RETURNS {}\n", ret));
954 }
955 } else if !proc.return_columns.is_empty() {
956 let cols: Vec<String> = proc
957 .return_columns
958 .iter()
959 .map(|c| format!("{} {}", c.name, c.data_type))
960 .collect();
961 sql.push_str(&format!("RETURNS TABLE ({})\n", cols.join(", ")));
962 } else if proc.is_function {
963 sql.push_str("RETURNS void\n");
964 }
965
966 sql.push_str(&format!("LANGUAGE {}\n", proc.language.to_sql()));
968
969 sql.push_str(&format!("{}\n", proc.volatility.to_sql()));
971
972 if proc.security_definer {
974 sql.push_str("SECURITY DEFINER\n");
975 }
976
977 if let Some(cost) = proc.cost {
979 sql.push_str(&format!("COST {}\n", cost));
980 }
981
982 if proc.parallel != ParallelSafety::Unsafe {
984 sql.push_str(&format!("{}\n", proc.parallel.to_sql()));
985 }
986
987 sql.push_str(&format!("AS $$\n{}\n$$;", proc.body));
989
990 if let Some(ref comment) = proc.comment {
992 sql.push_str(&format!(
993 "\n\nCOMMENT ON {} {} IS '{}';",
994 obj_type,
995 proc.qualified_name(),
996 comment.replace('\'', "''")
997 ));
998 }
999
1000 sql
1001 }
1002
1003 fn create_postgres_trigger(&self, trigger: &TriggerDefinition) -> String {
1004 let mut sql = String::new();
1005
1006 let or_replace = if trigger.or_replace {
1007 "OR REPLACE "
1008 } else {
1009 ""
1010 };
1011
1012 sql.push_str(&format!("CREATE {}TRIGGER {}\n", or_replace, trigger.name));
1013
1014 sql.push_str(&format!("{} ", trigger.timing.to_sql()));
1016
1017 let events: Vec<&str> = trigger.events.iter().map(|e| e.to_sql()).collect();
1019 sql.push_str(&events.join(" OR "));
1020
1021 sql.push_str(&format!("\nON {}\n", trigger.table));
1023
1024 sql.push_str(&format!("{}\n", trigger.level.to_sql()));
1026
1027 if let Some(ref cond) = trigger.condition {
1029 sql.push_str(&format!("WHEN ({})\n", cond));
1030 }
1031
1032 let args = if trigger.function_args.is_empty() {
1034 String::new()
1035 } else {
1036 trigger.function_args.join(", ")
1037 };
1038 sql.push_str(&format!("EXECUTE FUNCTION {}({});", trigger.function, args));
1039
1040 sql
1041 }
1042
1043 fn create_mysql_procedure(&self, proc: &ProcedureDefinition) -> String {
1045 let mut sql = String::new();
1046
1047 let obj_type = if proc.is_function {
1048 "FUNCTION"
1049 } else {
1050 "PROCEDURE"
1051 };
1052
1053 sql.push_str(&format!("CREATE {} {} (", obj_type, proc.qualified_name()));
1055
1056 let params: Vec<String> = proc
1058 .parameters
1059 .iter()
1060 .map(|p| {
1061 let mode = match p.mode {
1062 ParameterMode::In => "IN ",
1063 ParameterMode::Out => "OUT ",
1064 ParameterMode::InOut => "INOUT ",
1065 ParameterMode::Variadic => "",
1066 };
1067 format!("{}{} {}", mode, p.name, p.data_type)
1068 })
1069 .collect();
1070 sql.push_str(¶ms.join(", "));
1071 sql.push_str(")\n");
1072
1073 if proc.is_function {
1075 if let Some(ref ret) = proc.return_type {
1076 sql.push_str(&format!("RETURNS {}\n", ret));
1077 }
1078 }
1079
1080 if proc.volatility == Volatility::Immutable {
1082 sql.push_str("DETERMINISTIC\n");
1083 } else {
1084 sql.push_str("NOT DETERMINISTIC\n");
1085 }
1086
1087 if proc.security_definer {
1088 sql.push_str("SQL SECURITY DEFINER\n");
1089 }
1090
1091 sql.push_str(&format!("BEGIN\n{}\nEND;", proc.body));
1093
1094 sql
1095 }
1096
1097 fn create_mysql_trigger(&self, trigger: &TriggerDefinition) -> String {
1098 let mut sql = String::new();
1099
1100 sql.push_str(&format!("CREATE TRIGGER {}\n", trigger.name));
1101
1102 sql.push_str(&format!("{} ", trigger.timing.to_sql()));
1104
1105 if let Some(event) = trigger.events.first() {
1107 sql.push_str(&format!("{}\n", event.to_sql()));
1108 }
1109
1110 sql.push_str(&format!("ON {}\n", trigger.table));
1112
1113 sql.push_str(&format!("{}\n", trigger.level.to_sql()));
1115
1116 sql.push_str(&format!("BEGIN\n CALL {}();\nEND;", trigger.function));
1118
1119 sql
1120 }
1121
1122 fn create_sqlite_udf(&self, proc: &ProcedureDefinition) -> String {
1124 format!(
1126 "-- SQLite UDF: {} must be registered via rusqlite::create_scalar_function\n\
1127 -- Parameters: {}\n\
1128 -- Body:\n-- {}",
1129 proc.name,
1130 proc.parameters
1131 .iter()
1132 .map(|p| format!("{}: {}", p.name, p.data_type))
1133 .collect::<Vec<_>>()
1134 .join(", "),
1135 proc.body.replace('\n', "\n-- ")
1136 )
1137 }
1138
1139 fn create_sqlite_trigger(&self, trigger: &TriggerDefinition) -> String {
1140 let mut sql = String::new();
1141
1142 sql.push_str(&format!("CREATE TRIGGER IF NOT EXISTS {}\n", trigger.name));
1143
1144 sql.push_str(&format!("{} ", trigger.timing.to_sql()));
1146
1147 let events: Vec<&str> = trigger.events.iter().map(|e| e.to_sql()).collect();
1149 sql.push_str(&events.join(" OR "));
1150
1151 sql.push_str(&format!("\nON {}\n", trigger.table));
1153
1154 sql.push_str("FOR EACH ROW\n");
1156
1157 if let Some(ref cond) = trigger.condition {
1159 sql.push_str(&format!("WHEN {}\n", cond));
1160 }
1161
1162 sql.push_str(&format!("BEGIN\n SELECT {}();\nEND;", trigger.function));
1164
1165 sql
1166 }
1167
1168 fn create_mssql_procedure(&self, proc: &ProcedureDefinition) -> String {
1170 let mut sql = String::new();
1171
1172 let obj_type = if proc.is_function {
1173 "FUNCTION"
1174 } else {
1175 "PROCEDURE"
1176 };
1177
1178 sql.push_str(&format!("CREATE {} {} (", obj_type, proc.qualified_name()));
1179
1180 let params: Vec<String> = proc
1182 .parameters
1183 .iter()
1184 .map(|p| {
1185 let output = if p.mode == ParameterMode::Out || p.mode == ParameterMode::InOut {
1186 " OUTPUT"
1187 } else {
1188 ""
1189 };
1190 format!("@{} {}{}", p.name, p.data_type, output)
1191 })
1192 .collect();
1193 sql.push_str(¶ms.join(", "));
1194 sql.push_str(")\n");
1195
1196 if proc.is_function {
1198 if let Some(ref ret) = proc.return_type {
1199 sql.push_str(&format!("RETURNS {}\n", ret));
1200 }
1201 }
1202
1203 sql.push_str("AS\nBEGIN\n");
1204 sql.push_str(&proc.body);
1205 sql.push_str("\nEND;");
1206
1207 sql
1208 }
1209
1210 fn alter_mssql_procedure(&self, proc: &ProcedureDefinition) -> String {
1211 self.create_mssql_procedure(proc)
1213 .replacen("CREATE", "ALTER", 1)
1214 }
1215
1216 fn create_mssql_trigger(&self, trigger: &TriggerDefinition) -> String {
1217 let mut sql = String::new();
1218
1219 sql.push_str(&format!(
1220 "CREATE TRIGGER {}\nON {}\n",
1221 trigger.qualified_name(),
1222 trigger.table
1223 ));
1224
1225 sql.push_str(&format!("{} ", trigger.timing.to_sql()));
1227
1228 let events: Vec<&str> = trigger.events.iter().map(|e| e.to_sql()).collect();
1230 sql.push_str(&events.join(", "));
1231
1232 sql.push_str("\nAS\nBEGIN\n");
1233 sql.push_str(&format!(" EXEC {};\n", trigger.function));
1234 sql.push_str("END;");
1235
1236 sql
1237 }
1238
1239 pub fn generate_migration(&self, diff: &ProcedureDiff) -> MigrationSql {
1241 let mut up = Vec::new();
1242 let mut down = Vec::new();
1243
1244 for proc in &diff.create {
1246 up.push(self.create_procedure(proc));
1247 down.push(self.drop_procedure(proc));
1248 }
1249
1250 for name in &diff.drop {
1252 up.push(format!("DROP FUNCTION IF EXISTS {};", name));
1254 down.push(format!("-- Recreate {} (original definition needed)", name));
1256 }
1257
1258 for alter in &diff.alter {
1260 up.push(self.alter_procedure(alter));
1261 down.push(self.create_procedure(&alter.old));
1263 }
1264
1265 for trigger in &diff.create_triggers {
1267 up.push(self.create_trigger(trigger));
1268 down.push(self.drop_trigger(trigger));
1269 }
1270
1271 for name in &diff.drop_triggers {
1273 up.push(format!("DROP TRIGGER IF EXISTS {};", name));
1274 down.push(format!(
1275 "-- Recreate trigger {} (original definition needed)",
1276 name
1277 ));
1278 }
1279
1280 for alter in &diff.alter_triggers {
1282 up.push(self.drop_trigger(&alter.old));
1283 up.push(self.create_trigger(&alter.new));
1284 down.push(self.drop_trigger(&alter.new));
1285 down.push(self.create_trigger(&alter.old));
1286 }
1287
1288 MigrationSql {
1289 up: up.join("\n\n"),
1290 down: down.join("\n\n"),
1291 }
1292 }
1293}
1294
1295#[derive(Debug, Clone)]
1297pub struct MigrationSql {
1298 pub up: String,
1300 pub down: String,
1302}
1303
1304#[derive(Debug, Clone, Default, Serialize, Deserialize)]
1310pub struct ProcedureStore {
1311 pub procedures: HashMap<String, ProcedureDefinition>,
1313 pub triggers: HashMap<String, TriggerDefinition>,
1315 pub events: HashMap<String, ScheduledEvent>,
1317 pub history: Vec<ProcedureHistoryEntry>,
1319}
1320
1321#[derive(Debug, Clone, Serialize, Deserialize)]
1323pub struct ProcedureHistoryEntry {
1324 pub timestamp: String,
1326 pub migration_id: String,
1328 pub change_type: ChangeType,
1330 pub name: String,
1332 pub old_checksum: Option<String>,
1334 pub new_checksum: Option<String>,
1336}
1337
1338#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1340pub enum ChangeType {
1341 Create,
1342 Alter,
1343 Drop,
1344}
1345
1346impl ProcedureStore {
1347 pub fn new() -> Self {
1349 Self::default()
1350 }
1351
1352 pub fn add_procedure(&mut self, proc: ProcedureDefinition) {
1354 self.procedures.insert(proc.qualified_name(), proc);
1355 }
1356
1357 pub fn add_trigger(&mut self, trigger: TriggerDefinition) {
1359 self.triggers.insert(trigger.qualified_name(), trigger);
1360 }
1361
1362 pub fn procedures_list(&self) -> Vec<&ProcedureDefinition> {
1364 self.procedures.values().collect()
1365 }
1366
1367 pub fn triggers_list(&self) -> Vec<&TriggerDefinition> {
1369 self.triggers.values().collect()
1370 }
1371
1372 pub fn save(&self, path: &std::path::Path) -> std::io::Result<()> {
1374 let content = toml::to_string_pretty(self)
1375 .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
1376 std::fs::write(path, content)
1377 }
1378
1379 pub fn load(path: &std::path::Path) -> std::io::Result<Self> {
1381 let content = std::fs::read_to_string(path)?;
1382 toml::from_str(&content).map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))
1383 }
1384
1385 pub fn add_event(&mut self, event: ScheduledEvent) {
1387 self.events.insert(event.name.clone(), event);
1388 }
1389
1390 pub fn events_list(&self) -> Vec<&ScheduledEvent> {
1392 self.events.values().collect()
1393 }
1394}
1395
1396#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1402pub struct ScheduledEvent {
1403 pub name: String,
1405 pub schema: Option<String>,
1407 pub schedule: EventSchedule,
1409 pub body: String,
1411 pub enabled: bool,
1413 pub on_completion: OnCompletion,
1415 pub comment: Option<String>,
1417 pub starts: Option<String>,
1419 pub ends: Option<String>,
1421 pub definer: Option<String>,
1423}
1424
1425impl Default for ScheduledEvent {
1426 fn default() -> Self {
1427 Self {
1428 name: String::new(),
1429 schema: None,
1430 schedule: EventSchedule::Once,
1431 body: String::new(),
1432 enabled: true,
1433 on_completion: OnCompletion::Drop,
1434 comment: None,
1435 starts: None,
1436 ends: None,
1437 definer: None,
1438 }
1439 }
1440}
1441
1442impl ScheduledEvent {
1443 pub fn new(name: impl Into<String>) -> Self {
1445 Self {
1446 name: name.into(),
1447 ..Default::default()
1448 }
1449 }
1450
1451 pub fn schema(mut self, schema: impl Into<String>) -> Self {
1453 self.schema = Some(schema.into());
1454 self
1455 }
1456
1457 pub fn at(mut self, datetime: impl Into<String>) -> Self {
1459 self.schedule = EventSchedule::At(datetime.into());
1460 self
1461 }
1462
1463 pub fn every(mut self, interval: EventInterval) -> Self {
1465 self.schedule = EventSchedule::Every(interval);
1466 self
1467 }
1468
1469 pub fn body(mut self, body: impl Into<String>) -> Self {
1471 self.body = body.into();
1472 self
1473 }
1474
1475 pub fn disabled(mut self) -> Self {
1477 self.enabled = false;
1478 self
1479 }
1480
1481 pub fn preserve(mut self) -> Self {
1483 self.on_completion = OnCompletion::Preserve;
1484 self
1485 }
1486
1487 pub fn starts(mut self, datetime: impl Into<String>) -> Self {
1489 self.starts = Some(datetime.into());
1490 self
1491 }
1492
1493 pub fn ends(mut self, datetime: impl Into<String>) -> Self {
1495 self.ends = Some(datetime.into());
1496 self
1497 }
1498
1499 pub fn comment(mut self, comment: impl Into<String>) -> Self {
1501 self.comment = Some(comment.into());
1502 self
1503 }
1504
1505 pub fn qualified_name(&self) -> String {
1507 match &self.schema {
1508 Some(schema) => format!("{}.{}", schema, self.name),
1509 None => self.name.clone(),
1510 }
1511 }
1512}
1513
1514#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1516pub enum EventSchedule {
1517 Once,
1519 At(String),
1521 Every(EventInterval),
1523 Cron(String),
1525}
1526
1527#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1529pub struct EventInterval {
1530 pub quantity: u32,
1532 pub unit: IntervalUnit,
1534}
1535
1536impl EventInterval {
1537 pub fn new(quantity: u32, unit: IntervalUnit) -> Self {
1539 Self { quantity, unit }
1540 }
1541
1542 pub fn seconds(n: u32) -> Self {
1544 Self::new(n, IntervalUnit::Second)
1545 }
1546
1547 pub fn minutes(n: u32) -> Self {
1549 Self::new(n, IntervalUnit::Minute)
1550 }
1551
1552 pub fn hours(n: u32) -> Self {
1554 Self::new(n, IntervalUnit::Hour)
1555 }
1556
1557 pub fn days(n: u32) -> Self {
1559 Self::new(n, IntervalUnit::Day)
1560 }
1561
1562 pub fn weeks(n: u32) -> Self {
1564 Self::new(n, IntervalUnit::Week)
1565 }
1566
1567 pub fn months(n: u32) -> Self {
1569 Self::new(n, IntervalUnit::Month)
1570 }
1571
1572 pub fn to_mysql(&self) -> String {
1574 format!("{} {}", self.quantity, self.unit.to_mysql())
1575 }
1576}
1577
1578#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1580pub enum IntervalUnit {
1581 Second,
1582 Minute,
1583 Hour,
1584 Day,
1585 Week,
1586 Month,
1587 Quarter,
1588 Year,
1589}
1590
1591impl IntervalUnit {
1592 pub fn to_mysql(&self) -> &'static str {
1594 match self {
1595 Self::Second => "SECOND",
1596 Self::Minute => "MINUTE",
1597 Self::Hour => "HOUR",
1598 Self::Day => "DAY",
1599 Self::Week => "WEEK",
1600 Self::Month => "MONTH",
1601 Self::Quarter => "QUARTER",
1602 Self::Year => "YEAR",
1603 }
1604 }
1605}
1606
1607#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1609pub enum OnCompletion {
1610 #[default]
1612 Drop,
1613 Preserve,
1615}
1616
1617#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1623pub struct SqlAgentJob {
1624 pub name: String,
1626 pub description: Option<String>,
1628 pub category: Option<String>,
1630 pub owner: Option<String>,
1632 pub enabled: bool,
1634 pub steps: Vec<JobStep>,
1636 pub schedules: Vec<JobSchedule>,
1638 pub notify_level: NotifyLevel,
1640}
1641
1642impl Default for SqlAgentJob {
1643 fn default() -> Self {
1644 Self {
1645 name: String::new(),
1646 description: None,
1647 category: None,
1648 owner: None,
1649 enabled: true,
1650 steps: Vec::new(),
1651 schedules: Vec::new(),
1652 notify_level: NotifyLevel::OnFailure,
1653 }
1654 }
1655}
1656
1657impl SqlAgentJob {
1658 pub fn new(name: impl Into<String>) -> Self {
1660 Self {
1661 name: name.into(),
1662 ..Default::default()
1663 }
1664 }
1665
1666 pub fn description(mut self, desc: impl Into<String>) -> Self {
1668 self.description = Some(desc.into());
1669 self
1670 }
1671
1672 pub fn step(mut self, step: JobStep) -> Self {
1674 self.steps.push(step);
1675 self
1676 }
1677
1678 pub fn tsql_step(mut self, name: impl Into<String>, sql: impl Into<String>) -> Self {
1680 self.steps.push(JobStep {
1681 name: name.into(),
1682 step_type: StepType::TSql,
1683 command: sql.into(),
1684 database: None,
1685 on_success: StepAction::GoToNextStep,
1686 on_failure: StepAction::QuitWithFailure,
1687 retry_attempts: 0,
1688 retry_interval: 0,
1689 });
1690 self
1691 }
1692
1693 pub fn schedule(mut self, schedule: JobSchedule) -> Self {
1695 self.schedules.push(schedule);
1696 self
1697 }
1698
1699 pub fn disabled(mut self) -> Self {
1701 self.enabled = false;
1702 self
1703 }
1704}
1705
1706#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1708pub struct JobStep {
1709 pub name: String,
1711 pub step_type: StepType,
1713 pub command: String,
1715 pub database: Option<String>,
1717 pub on_success: StepAction,
1719 pub on_failure: StepAction,
1721 pub retry_attempts: u32,
1723 pub retry_interval: u32,
1725}
1726
1727#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1729pub enum StepType {
1730 #[default]
1731 TSql,
1732 CmdExec,
1733 PowerShell,
1734 Ssis,
1735 Ssas,
1736 Ssrs,
1737}
1738
1739impl StepType {
1740 pub fn subsystem(&self) -> &'static str {
1742 match self {
1743 Self::TSql => "TSQL",
1744 Self::CmdExec => "CmdExec",
1745 Self::PowerShell => "PowerShell",
1746 Self::Ssis => "SSIS",
1747 Self::Ssas => "AnalysisCommand",
1748 Self::Ssrs => "Reporting Services Command",
1749 }
1750 }
1751}
1752
1753#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1755pub enum StepAction {
1756 #[default]
1757 GoToNextStep,
1758 GoToStep(u32),
1759 QuitWithSuccess,
1760 QuitWithFailure,
1761}
1762
1763impl StepAction {
1764 pub fn action_id(&self) -> u32 {
1766 match self {
1767 Self::GoToNextStep => 3,
1768 Self::GoToStep(_) => 4,
1769 Self::QuitWithSuccess => 1,
1770 Self::QuitWithFailure => 2,
1771 }
1772 }
1773}
1774
1775#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1777pub struct JobSchedule {
1778 pub name: String,
1780 pub frequency: ScheduleFrequency,
1782 pub active_start_time: Option<String>,
1784 pub start_date: Option<String>,
1786 pub end_date: Option<String>,
1788 pub enabled: bool,
1790}
1791
1792impl Default for JobSchedule {
1793 fn default() -> Self {
1794 Self {
1795 name: String::new(),
1796 frequency: ScheduleFrequency::Daily { every_n_days: 1 },
1797 active_start_time: None,
1798 start_date: None,
1799 end_date: None,
1800 enabled: true,
1801 }
1802 }
1803}
1804
1805impl JobSchedule {
1806 pub fn new(name: impl Into<String>) -> Self {
1808 Self {
1809 name: name.into(),
1810 ..Default::default()
1811 }
1812 }
1813
1814 pub fn once(mut self) -> Self {
1816 self.frequency = ScheduleFrequency::Once;
1817 self
1818 }
1819
1820 pub fn daily(mut self, every_n_days: u32) -> Self {
1822 self.frequency = ScheduleFrequency::Daily { every_n_days };
1823 self
1824 }
1825
1826 pub fn weekly(mut self, days: Vec<Weekday>) -> Self {
1828 self.frequency = ScheduleFrequency::Weekly {
1829 every_n_weeks: 1,
1830 days,
1831 };
1832 self
1833 }
1834
1835 pub fn monthly(mut self, day_of_month: u32) -> Self {
1837 self.frequency = ScheduleFrequency::Monthly {
1838 every_n_months: 1,
1839 day_of_month,
1840 };
1841 self
1842 }
1843
1844 pub fn at(mut self, time: impl Into<String>) -> Self {
1846 self.active_start_time = Some(time.into());
1847 self
1848 }
1849}
1850
1851#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1853pub enum ScheduleFrequency {
1854 Once,
1855 Daily {
1856 every_n_days: u32,
1857 },
1858 Weekly {
1859 every_n_weeks: u32,
1860 days: Vec<Weekday>,
1861 },
1862 Monthly {
1863 every_n_months: u32,
1864 day_of_month: u32,
1865 },
1866 OnIdle,
1867 OnAgentStart,
1868}
1869
1870impl ScheduleFrequency {
1871 pub fn freq_type(&self) -> u32 {
1873 match self {
1874 Self::Once => 1,
1875 Self::Daily { .. } => 4,
1876 Self::Weekly { .. } => 8,
1877 Self::Monthly { .. } => 16,
1878 Self::OnIdle => 128,
1879 Self::OnAgentStart => 64,
1880 }
1881 }
1882
1883 pub fn freq_interval(&self) -> u32 {
1885 match self {
1886 Self::Once => 0,
1887 Self::Daily { every_n_days } => *every_n_days,
1888 Self::Weekly { days, .. } => days.iter().map(|d| d.bitmask()).fold(0, |acc, m| acc | m),
1889 Self::Monthly { day_of_month, .. } => *day_of_month,
1890 Self::OnIdle | Self::OnAgentStart => 0,
1891 }
1892 }
1893}
1894
1895#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1897pub enum Weekday {
1898 Sunday,
1899 Monday,
1900 Tuesday,
1901 Wednesday,
1902 Thursday,
1903 Friday,
1904 Saturday,
1905}
1906
1907impl Weekday {
1908 pub fn bitmask(&self) -> u32 {
1910 match self {
1911 Self::Sunday => 1,
1912 Self::Monday => 2,
1913 Self::Tuesday => 4,
1914 Self::Wednesday => 8,
1915 Self::Thursday => 16,
1916 Self::Friday => 32,
1917 Self::Saturday => 64,
1918 }
1919 }
1920}
1921
1922#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
1924pub enum NotifyLevel {
1925 Never,
1926 OnSuccess,
1927 #[default]
1928 OnFailure,
1929 Always,
1930}
1931
1932#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
1938pub struct AtlasTrigger {
1939 pub name: String,
1941 pub trigger_type: AtlasTriggerType,
1943 pub enabled: bool,
1945 pub function_name: String,
1947 pub match_expression: Option<String>,
1949 pub project: Option<String>,
1951 pub full_document: bool,
1953 pub full_document_before_change: bool,
1955}
1956
1957impl Default for AtlasTrigger {
1958 fn default() -> Self {
1959 Self {
1960 name: String::new(),
1961 trigger_type: AtlasTriggerType::Database {
1962 database: String::new(),
1963 collection: String::new(),
1964 operation_types: Vec::new(),
1965 },
1966 enabled: true,
1967 function_name: String::new(),
1968 match_expression: None,
1969 project: None,
1970 full_document: false,
1971 full_document_before_change: false,
1972 }
1973 }
1974}
1975
1976impl AtlasTrigger {
1977 pub fn new(name: impl Into<String>) -> Self {
1979 Self {
1980 name: name.into(),
1981 ..Default::default()
1982 }
1983 }
1984
1985 pub fn database(
1987 mut self,
1988 database: impl Into<String>,
1989 collection: impl Into<String>,
1990 operations: Vec<AtlasOperation>,
1991 ) -> Self {
1992 self.trigger_type = AtlasTriggerType::Database {
1993 database: database.into(),
1994 collection: collection.into(),
1995 operation_types: operations,
1996 };
1997 self
1998 }
1999
2000 pub fn scheduled(mut self, cron: impl Into<String>) -> Self {
2002 self.trigger_type = AtlasTriggerType::Scheduled {
2003 schedule: cron.into(),
2004 };
2005 self
2006 }
2007
2008 pub fn authentication(mut self, operation: AuthOperation) -> Self {
2010 self.trigger_type = AtlasTriggerType::Authentication { operation };
2011 self
2012 }
2013
2014 pub fn function(mut self, name: impl Into<String>) -> Self {
2016 self.function_name = name.into();
2017 self
2018 }
2019
2020 pub fn full_document(mut self) -> Self {
2022 self.full_document = true;
2023 self
2024 }
2025
2026 pub fn disabled(mut self) -> Self {
2028 self.enabled = false;
2029 self
2030 }
2031}
2032
2033#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
2035pub enum AtlasTriggerType {
2036 Database {
2038 database: String,
2039 collection: String,
2040 operation_types: Vec<AtlasOperation>,
2041 },
2042 Scheduled { schedule: String },
2044 Authentication { operation: AuthOperation },
2046}
2047
2048#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2050pub enum AtlasOperation {
2051 Insert,
2052 Update,
2053 Replace,
2054 Delete,
2055}
2056
2057impl AtlasOperation {
2058 pub fn as_str(&self) -> &'static str {
2060 match self {
2061 Self::Insert => "INSERT",
2062 Self::Update => "UPDATE",
2063 Self::Replace => "REPLACE",
2064 Self::Delete => "DELETE",
2065 }
2066 }
2067}
2068
2069#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
2071pub enum AuthOperation {
2072 Create,
2073 Login,
2074 Delete,
2075}
2076
2077impl ProcedureSqlGenerator {
2082 pub fn create_event(&self, event: &ScheduledEvent) -> String {
2084 match self.db_type {
2085 DatabaseType::MySQL => self.create_mysql_event(event),
2086 DatabaseType::MSSQL => {
2087 format!(
2089 "-- Use SqlAgentJob for MSSQL scheduled tasks\n\
2090 -- Event: {}\n\
2091 -- Schedule: {:?}",
2092 event.name, event.schedule
2093 )
2094 }
2095 _ => format!("-- Events not supported for {:?}", self.db_type),
2096 }
2097 }
2098
2099 pub fn drop_event(&self, event: &ScheduledEvent) -> String {
2101 match self.db_type {
2102 DatabaseType::MySQL => {
2103 format!("DROP EVENT IF EXISTS {};", event.qualified_name())
2104 }
2105 _ => format!("-- DROP EVENT not supported for {:?}", self.db_type),
2106 }
2107 }
2108
2109 fn create_mysql_event(&self, event: &ScheduledEvent) -> String {
2110 let mut sql = String::new();
2111
2112 sql.push_str(&format!(
2113 "CREATE EVENT IF NOT EXISTS {}\n",
2114 event.qualified_name()
2115 ));
2116
2117 match &event.schedule {
2119 EventSchedule::Once => sql.push_str("ON SCHEDULE AT CURRENT_TIMESTAMP\n"),
2120 EventSchedule::At(datetime) => {
2121 sql.push_str(&format!("ON SCHEDULE AT '{}'\n", datetime))
2122 }
2123 EventSchedule::Every(interval) => {
2124 sql.push_str(&format!("ON SCHEDULE EVERY {}\n", interval.to_mysql()));
2125 if let Some(ref starts) = event.starts {
2126 sql.push_str(&format!("STARTS '{}'\n", starts));
2127 }
2128 if let Some(ref ends) = event.ends {
2129 sql.push_str(&format!("ENDS '{}'\n", ends));
2130 }
2131 }
2132 EventSchedule::Cron(_) => {
2133 sql.push_str("ON SCHEDULE EVERY 1 DAY\n");
2135 }
2136 }
2137
2138 match event.on_completion {
2140 OnCompletion::Drop => sql.push_str("ON COMPLETION NOT PRESERVE\n"),
2141 OnCompletion::Preserve => sql.push_str("ON COMPLETION PRESERVE\n"),
2142 }
2143
2144 if event.enabled {
2146 sql.push_str("ENABLE\n");
2147 } else {
2148 sql.push_str("DISABLE\n");
2149 }
2150
2151 if let Some(ref comment) = event.comment {
2153 sql.push_str(&format!("COMMENT '{}'\n", comment.replace('\'', "''")));
2154 }
2155
2156 sql.push_str(&format!("DO\n{};", event.body));
2158
2159 sql
2160 }
2161
2162 pub fn create_sql_agent_job(&self, job: &SqlAgentJob) -> String {
2164 let mut sql = String::new();
2165
2166 sql.push_str("-- Create SQL Agent Job\n");
2168 sql.push_str("EXEC msdb.dbo.sp_add_job\n");
2169 sql.push_str(&format!(" @job_name = N'{}',\n", job.name));
2170
2171 if let Some(ref desc) = job.description {
2172 sql.push_str(&format!(" @description = N'{}',\n", desc));
2173 }
2174
2175 sql.push_str(&format!(
2176 " @enabled = {};\n\n",
2177 if job.enabled { 1 } else { 0 }
2178 ));
2179
2180 for (i, step) in job.steps.iter().enumerate() {
2182 sql.push_str(&format!("-- Step {}: {}\n", i + 1, step.name));
2183 sql.push_str("EXEC msdb.dbo.sp_add_jobstep\n");
2184 sql.push_str(&format!(" @job_name = N'{}',\n", job.name));
2185 sql.push_str(&format!(" @step_name = N'{}',\n", step.name));
2186 sql.push_str(&format!(
2187 " @subsystem = N'{}',\n",
2188 step.step_type.subsystem()
2189 ));
2190 sql.push_str(&format!(
2191 " @command = N'{}',\n",
2192 step.command.replace('\'', "''")
2193 ));
2194
2195 if let Some(ref db) = step.database {
2196 sql.push_str(&format!(" @database_name = N'{}',\n", db));
2197 }
2198
2199 sql.push_str(&format!(
2200 " @on_success_action = {},\n",
2201 step.on_success.action_id()
2202 ));
2203 sql.push_str(&format!(
2204 " @on_fail_action = {},\n",
2205 step.on_failure.action_id()
2206 ));
2207 sql.push_str(&format!(" @retry_attempts = {},\n", step.retry_attempts));
2208 sql.push_str(&format!(
2209 " @retry_interval = {};\n\n",
2210 step.retry_interval
2211 ));
2212 }
2213
2214 for schedule in &job.schedules {
2216 sql.push_str(&format!("-- Schedule: {}\n", schedule.name));
2217 sql.push_str("EXEC msdb.dbo.sp_add_schedule\n");
2218 sql.push_str(&format!(" @schedule_name = N'{}',\n", schedule.name));
2219 sql.push_str(&format!(
2220 " @enabled = {},\n",
2221 if schedule.enabled { 1 } else { 0 }
2222 ));
2223 sql.push_str(&format!(
2224 " @freq_type = {},\n",
2225 schedule.frequency.freq_type()
2226 ));
2227 sql.push_str(&format!(
2228 " @freq_interval = {};\n",
2229 schedule.frequency.freq_interval()
2230 ));
2231
2232 sql.push_str("\nEXEC msdb.dbo.sp_attach_schedule\n");
2234 sql.push_str(&format!(" @job_name = N'{}',\n", job.name));
2235 sql.push_str(&format!(" @schedule_name = N'{}';\n\n", schedule.name));
2236 }
2237
2238 sql.push_str("EXEC msdb.dbo.sp_add_jobserver\n");
2240 sql.push_str(&format!(" @job_name = N'{}',\n", job.name));
2241 sql.push_str(" @server_name = N'(LOCAL)';\n");
2242
2243 sql
2244 }
2245
2246 pub fn drop_sql_agent_job(&self, job_name: &str) -> String {
2248 format!(
2249 "IF EXISTS (SELECT 1 FROM msdb.dbo.sysjobs WHERE name = N'{}')\n\
2250 BEGIN\n\
2251 EXEC msdb.dbo.sp_delete_job @job_name = N'{}';\n\
2252 END;",
2253 job_name, job_name
2254 )
2255 }
2256}
2257
2258#[derive(Debug, Clone, Default)]
2264pub struct EventDiff {
2265 pub create: Vec<ScheduledEvent>,
2267 pub drop: Vec<String>,
2269 pub alter: Vec<EventAlterDiff>,
2271 pub create_jobs: Vec<SqlAgentJob>,
2273 pub drop_jobs: Vec<String>,
2275}
2276
2277#[derive(Debug, Clone)]
2279pub struct EventAlterDiff {
2280 pub old: ScheduledEvent,
2282 pub new: ScheduledEvent,
2284}
2285
2286impl EventDiff {
2287 pub fn is_empty(&self) -> bool {
2289 self.create.is_empty()
2290 && self.drop.is_empty()
2291 && self.alter.is_empty()
2292 && self.create_jobs.is_empty()
2293 && self.drop_jobs.is_empty()
2294 }
2295}
2296
2297#[cfg(test)]
2298mod tests {
2299 use super::*;
2300
2301 #[test]
2302 fn test_procedure_definition() {
2303 let proc = ProcedureDefinition::function("calculate_tax")
2304 .schema("public")
2305 .param("amount", "DECIMAL(10,2)")
2306 .param("rate", "DECIMAL(5,4)")
2307 .returns("DECIMAL(10,2)")
2308 .language(ProcedureLanguage::PlPgSql)
2309 .immutable()
2310 .body("RETURN amount * rate;");
2311
2312 assert_eq!(proc.name, "calculate_tax");
2313 assert_eq!(proc.qualified_name(), "public.calculate_tax");
2314 assert_eq!(proc.parameters.len(), 2);
2315 assert!(proc.is_function);
2316 }
2317
2318 #[test]
2319 fn test_trigger_definition() {
2320 let trigger = TriggerDefinition::new("audit_users", "users")
2321 .after()
2322 .on(vec![TriggerEvent::Insert, TriggerEvent::Update])
2323 .for_each_row()
2324 .execute("audit_trigger_fn");
2325
2326 assert_eq!(trigger.name, "audit_users");
2327 assert_eq!(trigger.table, "users");
2328 assert_eq!(trigger.timing, TriggerTiming::After);
2329 assert_eq!(trigger.events.len(), 2);
2330 }
2331
2332 #[test]
2333 fn test_procedure_diff() {
2334 let old = vec![
2335 ProcedureDefinition::function("fn1").body("v1"),
2336 ProcedureDefinition::function("fn2").body("v2"),
2337 ];
2338 let new = vec![
2339 ProcedureDefinition::function("fn1").body("v1_updated"),
2340 ProcedureDefinition::function("fn3").body("v3"),
2341 ];
2342
2343 let diff = ProcedureDiffer::diff(&old, &new);
2344
2345 assert_eq!(diff.create.len(), 1); assert_eq!(diff.drop.len(), 1); assert_eq!(diff.alter.len(), 1); }
2349
2350 #[test]
2351 fn test_postgres_sql_generation() {
2352 let generator = ProcedureSqlGenerator::new(DatabaseType::PostgreSQL);
2353
2354 let proc = ProcedureDefinition::function("greet")
2355 .param("name", "TEXT")
2356 .returns("TEXT")
2357 .language(ProcedureLanguage::Sql)
2358 .immutable()
2359 .body("SELECT 'Hello, ' || name || '!';");
2360
2361 let sql = generator.create_procedure(&proc);
2362 assert!(sql.contains("CREATE OR REPLACE"));
2363 assert!(sql.contains("FUNCTION"));
2364 assert!(sql.contains("RETURNS TEXT"));
2365 assert!(sql.contains("IMMUTABLE"));
2366 }
2367
2368 #[test]
2369 fn test_trigger_sql_generation() {
2370 let generator = ProcedureSqlGenerator::new(DatabaseType::PostgreSQL);
2371
2372 let trigger = TriggerDefinition::new("update_timestamp", "users")
2373 .before()
2374 .on(vec![TriggerEvent::Update])
2375 .for_each_row()
2376 .execute("set_updated_at");
2377
2378 let sql = generator.create_trigger(&trigger);
2379 assert!(sql.contains("CREATE OR REPLACE TRIGGER"));
2380 assert!(sql.contains("BEFORE UPDATE"));
2381 assert!(sql.contains("FOR EACH ROW"));
2382 }
2383
2384 #[test]
2385 fn test_procedure_store() {
2386 let mut store = ProcedureStore::new();
2387
2388 store.add_procedure(ProcedureDefinition::function("fn1").body("test"));
2389 store.add_trigger(TriggerDefinition::new("tr1", "table1"));
2390
2391 assert_eq!(store.procedures.len(), 1);
2392 assert_eq!(store.triggers.len(), 1);
2393 }
2394
2395 #[test]
2396 fn test_scheduled_event() {
2397 let event = ScheduledEvent::new("cleanup_old_data")
2398 .every(EventInterval::days(1))
2399 .body("DELETE FROM logs WHERE created_at < NOW() - INTERVAL 30 DAY")
2400 .preserve()
2401 .comment("Daily cleanup of old log entries");
2402
2403 assert_eq!(event.name, "cleanup_old_data");
2404 assert!(matches!(event.schedule, EventSchedule::Every(_)));
2405 assert_eq!(event.on_completion, OnCompletion::Preserve);
2406 }
2407
2408 #[test]
2409 fn test_mysql_event_sql() {
2410 let generator = ProcedureSqlGenerator::new(DatabaseType::MySQL);
2411
2412 let event = ScheduledEvent::new("hourly_stats")
2413 .every(EventInterval::hours(1))
2414 .body("CALL update_statistics()");
2415
2416 let sql = generator.create_event(&event);
2417 assert!(sql.contains("CREATE EVENT IF NOT EXISTS"));
2418 assert!(sql.contains("ON SCHEDULE EVERY 1 HOUR"));
2419 }
2420
2421 #[test]
2422 fn test_sql_agent_job() {
2423 let job = SqlAgentJob::new("nightly_backup")
2424 .description("Nightly database backup")
2425 .tsql_step(
2426 "Backup",
2427 "BACKUP DATABASE mydb TO DISK = 'C:\\backups\\mydb.bak'",
2428 )
2429 .schedule(JobSchedule::new("daily_2am").daily(1).at("02:00:00"));
2430
2431 assert_eq!(job.name, "nightly_backup");
2432 assert_eq!(job.steps.len(), 1);
2433 assert_eq!(job.schedules.len(), 1);
2434 }
2435
2436 #[test]
2437 fn test_sql_agent_job_sql() {
2438 let generator = ProcedureSqlGenerator::new(DatabaseType::MSSQL);
2439
2440 let job = SqlAgentJob::new("test_job").tsql_step("Step1", "SELECT 1");
2441
2442 let sql = generator.create_sql_agent_job(&job);
2443 assert!(sql.contains("sp_add_job"));
2444 assert!(sql.contains("sp_add_jobstep"));
2445 }
2446
2447 #[test]
2448 fn test_atlas_trigger() {
2449 let trigger = AtlasTrigger::new("on_user_create")
2450 .database("mydb", "users", vec![AtlasOperation::Insert])
2451 .function("handleUserCreate")
2452 .full_document();
2453
2454 assert_eq!(trigger.name, "on_user_create");
2455 assert!(trigger.full_document);
2456 assert!(matches!(
2457 trigger.trigger_type,
2458 AtlasTriggerType::Database { .. }
2459 ));
2460 }
2461}