1use std::{collections::HashSet, time::Duration};
2
3use delete::Delete;
4use log::debug;
5use upsert::Upsert;
6
7pub mod builder;
8pub mod upsert;
9pub mod delete;
10pub mod shutdown_service;
11
12fn remove_upsert_duplicates<T>(data: &mut Vec<T>) where T: Upsert<T> + Clone + Send + 'static {
13 let mut hash_set = HashSet::new();
14 data.sort_by(|x, y| y.modified_date().cmp(&x.modified_date()));
15 data.retain(|data| hash_set.insert(data.pkey().clone()))
16}
17
18fn remove_delete_duplicates<T>(data: &mut Vec<T>) where T: Delete<T> + Clone + Send + 'static {
19 let mut hash_set = HashSet::new();
20 data.sort_by(|x, y| y.modified_date().cmp(&x.modified_date()));
21 data.retain(|data| hash_set.insert(data.pkey().clone()))
22}
23
24fn split_vec_by_given<T>(mut data: Vec<T>, hundreds: usize, tens: usize, single_digit: usize) -> Vec<Vec<T>> where T: Upsert<T> + Clone + Send +'static {
25 let mut results = vec![];
26
27 for _hundred in 0..hundreds {
28 let (data_1, data_2) = data.split_at(100);
29 results.push(data_1.to_vec());
30 data = data_2.to_vec();
31 }
32
33 for _ten in 0..tens {
34 let (data_1, data_2) = data.split_at(10);
35 results.push(data_1.to_vec());
36 data = data_2.to_vec();
37 }
38
39 if single_digit != data.len() {
40 panic!("Unreachable logic reached")
41 } else {
42 if !data.is_empty() {
43 results.push(data);
44 }
45 return results;
46 }
47}
48
49fn split_vec<T>(data: Vec<T>) -> Vec<Vec<T>> where T: Upsert<T> + Clone + Send + 'static {
50 let hundreds = data.len() / 100;
51 let hundreds_remainder = data.len() % 100;
52
53 let tens = hundreds_remainder / 10;
54 let tens_remainder = hundreds_remainder % 10;
55
56
57 split_vec_by_given(data, hundreds, tens, tens_remainder)
58}
59
60async fn introduce_lag(lag: u64) {
61 debug!("introducing lag: {}ms", lag);
62 tokio::time::sleep(Duration::from_millis(lag)).await;
63 debug!("introduced lag complete");
64}
65
66
67
68#[cfg(test)]
69mod tests {
70 use crate::builder::{support::QueryHolderBuilder, QuickStreamBuilder};
71 use crate::Upsert;
72 use std::time::Duration;
73 use chrono::DateTime;
74 use tokio::sync::mpsc;
75 use chrono::NaiveDateTime;
76 use async_trait::async_trait;
77 use tokio::time;
78 use tokio_postgres::{Client, Statement, Error};
79 use tokio_util::sync::CancellationToken;
80
81 #[derive(Clone, Debug)]
82 struct MockData {
83 id: i64,
84 modified_date: NaiveDateTime,
85 }
86
87 #[async_trait]
88 impl Upsert<MockData> for MockData {
89 async fn upsert(
90 _client: &Client,
91 data: Vec<MockData>,
92 _statement: &Statement,
93 _thread_id: i64,
94 ) -> Result<u64, Error> {
95 println!("data received, amount : {}", data.len());
96 Ok(1)
97 }
98
99 fn modified_date(&self) -> NaiveDateTime {
100 self.modified_date
101 }
102
103 fn pkey(&self) -> i64 {
104 self.id
105 }
106 }
107
108 #[ignore = "only works with a database connection"]
109 #[tokio::test]
110 async fn test_upsert_quick_stream() {
111 let query = "INSERT INTO trax_production.ftp_current (ftpc_tripplannumber, ftpc_tripplanversion, ftpc_scheduleeventseq, ftpc_scheduleeventcode, ftpc_scheduleeventtype, ftpc_scheduleeventcity, ftpc_scheduleeventstate, ftpc_schedulerailcarrier, ftpc_scheduletrainid, ftpc_scheduledatetime, ftpc_scheduletimemillis, ftpc_estimatedetadatetime, ftpc_estimatedetatimemillis, ftpc_eventtimezone, ftpc_actualeventdatetime, ftpc_actualtimemillis, ftpc_scheduledaynumber, ftpc_schedulecutofftime, ftpc_schedulecutoffday, ftpc_operationmon, ftpc_operationtue, ftpc_operationwed, ftpc_operationthu, ftpc_operationfri, ftpc_operationsat, ftpc_operationsun, ftpc_comments, ftpc_actualeventcode, ftpc_actualtrainid, ftpc_optn_prfmnce_ind, ftpc_optn_prfmnce_minutes, ftpc_ovrl_prfmnce_ind, ftpc_ovrl_prfmnce_minutes, ftpc_consignee_id, ftpc_shipper_id, ftpc_close_ind, ftpc_clm_load_status, ftpc_clm_destination, id, modified_date, ev_date_time, trax_created_date_time, trax_updated_date_time, created_date, row_active, record_synced_datetime) VALUES(51183682, 1, 28, 'ARV', NULL, 'INTFREGAT', 'MO', 'KCS', NULL, '2024-04-17 23:30:00.000', 1713391200000, '2024-04-17 19:31:00.000', 1713376860000, 'EST', NULL, NULL, 8, NULL, NULL, 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', NULL, NULL, NULL, NULL, NULL, 'ONTIME', 239, 29024027, 931429, '0', NULL, NULL, 330288083, '2024-04-10 21:39:54.000', NULL, NULL, NULL, '2024-04-10 21:39:54.000', true, '2024-04-30 04:29:50.634');".to_string();
112
113 let mut config = tokio_postgres::Config::new();
114
115 config
116 .host("127.0.0.1")
117 .port(5432)
118 .user("production")
119 .password("production")
120 .dbname("analyticsdb")
121 .connect_timeout(Duration::from_secs(30));
122
123 let cancellation_token = CancellationToken::new();
124 let mut query_holder_builder = QueryHolderBuilder::new();
125 query_holder_builder
126 .set_one(query.to_owned())
127 .set_two(query.to_owned())
128 .set_three(query.to_owned())
129 .set_four(query.to_owned())
130 .set_five(query.to_owned())
131 .set_six(query.to_owned())
132 .set_seven(query.to_owned())
133 .set_eight(query.to_owned())
134 .set_nine(query.to_owned())
135 .set_ten(query.to_owned())
136 .set_hundred(query.to_owned());
137
138 let query_holder = query_holder_builder.build();
139
140 let mut quick_stream_builder = QuickStreamBuilder::default();
141
142 quick_stream_builder
143 .cancellation_tocken(cancellation_token)
144 .max_connection_count(10)
145 .buffer_size(10)
146 .single_digits(1)
147 .tens(2)
148 .hundreds(1)
149 .db_config(config)
150 .queries(query_holder)
151 .max_records_per_cycle_batch(100)
152 .introduced_lag_cycles(1)
153 .introduced_lag_in_millies(10)
154 .connection_creation_threshold(25.0);
155
156 let upsert_quick_stream = quick_stream_builder.build_update();
157
158 let (tx, rx) = mpsc::channel::<Vec<MockData>>(100);
159 let handle = tokio::spawn(async move {
160 upsert_quick_stream.run(rx).await;
161 });
162
163 let data = vec![
164 MockData {
165 id: 1,
166 modified_date: DateTime::from_timestamp(1627847280, 0).unwrap().naive_local(),
167 },
168 MockData {
169 id: 2,
170 modified_date: DateTime::from_timestamp(1627847280, 0).unwrap().naive_local(),
171 },
172 ];
173
174 tx.send(data.clone()).await.unwrap();
175 time::sleep(Duration::from_secs(5)).await;
176 tx.send(data).await.unwrap();
177 handle.await.unwrap();
178 }
179
180}