quick_stream/
lib.rs

1use std::{collections::HashSet, time::Duration};
2
3use delete::Delete;
4use log::debug;
5use upsert::Upsert;
6
7pub mod builder;
8pub mod upsert;
9pub mod delete;
10pub mod shutdown_service;
11
12fn remove_upsert_duplicates<T>(data: &mut Vec<T>) where T: Upsert<T> + Clone + Send + 'static {
13    let mut hash_set = HashSet::new();
14    data.sort_by(|x, y| y.modified_date().cmp(&x.modified_date()));
15    data.retain(|data| hash_set.insert(data.pkey().clone()))
16}
17
18fn remove_delete_duplicates<T>(data: &mut Vec<T>) where T: Delete<T> + Clone + Send + 'static {
19    let mut hash_set = HashSet::new();
20    data.sort_by(|x, y| y.modified_date().cmp(&x.modified_date()));
21    data.retain(|data| hash_set.insert(data.pkey().clone()))
22}
23
24fn split_vec_by_given<T>(mut data: Vec<T>, hundreds: usize, tens: usize, single_digit: usize) -> Vec<Vec<T>> where T: Upsert<T> + Clone + Send +'static {
25    let mut results = vec![];
26
27    for _hundred in 0..hundreds {
28        let (data_1, data_2) = data.split_at(100);
29        results.push(data_1.to_vec());
30        data = data_2.to_vec();
31    }
32
33    for _ten in 0..tens {
34        let (data_1, data_2) = data.split_at(10);
35        results.push(data_1.to_vec());
36        data = data_2.to_vec();
37    }
38
39    if single_digit != data.len() {
40        panic!("Unreachable logic reached")
41    } else {
42        if !data.is_empty() {
43            results.push(data);
44        }
45        return results;
46    }
47}
48
49fn split_vec<T>(data: Vec<T>) -> Vec<Vec<T>> where T: Upsert<T> + Clone + Send + 'static {
50    let hundreds = data.len() / 100;
51    let hundreds_remainder = data.len() % 100;
52
53    let tens = hundreds_remainder / 10;
54    let tens_remainder = hundreds_remainder % 10;
55
56
57    split_vec_by_given(data, hundreds, tens, tens_remainder)
58}
59
60async fn introduce_lag(lag: u64) {
61    debug!("introducing lag: {}ms", lag);
62    tokio::time::sleep(Duration::from_millis(lag)).await;
63    debug!("introduced lag complete");
64}
65
66
67
68#[cfg(test)]
69mod tests {
70    use crate::builder::{support::QueryHolderBuilder, QuickStreamBuilder};
71    use crate::Upsert;
72    use std::time::Duration;
73    use chrono::DateTime;
74    use tokio::sync::mpsc;
75    use chrono::NaiveDateTime;
76    use async_trait::async_trait;
77    use tokio::time;
78    use tokio_postgres::{Client, Statement, Error};
79    use tokio_util::sync::CancellationToken;
80
81    #[derive(Clone, Debug)]
82    struct MockData {
83        id: i64,
84        modified_date: NaiveDateTime,
85    }
86
87    #[async_trait]
88    impl Upsert<MockData> for MockData {
89        async fn upsert(
90            _client: &Client,
91            data: Vec<MockData>,
92            _statement: &Statement,
93            _thread_id: i64,
94        ) -> Result<u64, Error> {
95            println!("data received, amount : {}", data.len());
96            Ok(1)
97        }
98
99        fn modified_date(&self) -> NaiveDateTime {
100            self.modified_date
101        }
102
103        fn pkey(&self) -> i64 {
104            self.id
105        }
106    }
107
108    #[ignore = "only works with a database connection"]
109    #[tokio::test]
110    async fn test_upsert_quick_stream() {
111        let query = "INSERT INTO trax_production.ftp_current (ftpc_tripplannumber, ftpc_tripplanversion, ftpc_scheduleeventseq, ftpc_scheduleeventcode, ftpc_scheduleeventtype, ftpc_scheduleeventcity, ftpc_scheduleeventstate, ftpc_schedulerailcarrier, ftpc_scheduletrainid, ftpc_scheduledatetime, ftpc_scheduletimemillis, ftpc_estimatedetadatetime, ftpc_estimatedetatimemillis, ftpc_eventtimezone, ftpc_actualeventdatetime, ftpc_actualtimemillis, ftpc_scheduledaynumber, ftpc_schedulecutofftime, ftpc_schedulecutoffday, ftpc_operationmon, ftpc_operationtue, ftpc_operationwed, ftpc_operationthu, ftpc_operationfri, ftpc_operationsat, ftpc_operationsun, ftpc_comments, ftpc_actualeventcode, ftpc_actualtrainid, ftpc_optn_prfmnce_ind, ftpc_optn_prfmnce_minutes, ftpc_ovrl_prfmnce_ind, ftpc_ovrl_prfmnce_minutes, ftpc_consignee_id, ftpc_shipper_id, ftpc_close_ind, ftpc_clm_load_status, ftpc_clm_destination, id, modified_date, ev_date_time, trax_created_date_time, trax_updated_date_time, created_date, row_active, record_synced_datetime) VALUES(51183682, 1, 28, 'ARV', NULL, 'INTFREGAT', 'MO', 'KCS', NULL, '2024-04-17 23:30:00.000', 1713391200000, '2024-04-17 19:31:00.000', 1713376860000, 'EST', NULL, NULL, 8, NULL, NULL, 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', 'Y', NULL, NULL, NULL, NULL, NULL, 'ONTIME', 239, 29024027, 931429, '0', NULL, NULL, 330288083, '2024-04-10 21:39:54.000', NULL, NULL, NULL, '2024-04-10 21:39:54.000', true, '2024-04-30 04:29:50.634');".to_string();
112
113        let mut config = tokio_postgres::Config::new();
114        
115        config
116        .host("127.0.0.1")
117        .port(5432)
118        .user("production")
119        .password("production")
120        .dbname("analyticsdb")
121        .connect_timeout(Duration::from_secs(30));
122
123        let cancellation_token = CancellationToken::new();
124        let mut query_holder_builder = QueryHolderBuilder::new();
125        query_holder_builder
126        .set_one(query.to_owned())
127        .set_two(query.to_owned())
128        .set_three(query.to_owned())
129        .set_four(query.to_owned())
130        .set_five(query.to_owned())
131        .set_six(query.to_owned())
132        .set_seven(query.to_owned())
133        .set_eight(query.to_owned())
134        .set_nine(query.to_owned())
135        .set_ten(query.to_owned())
136        .set_hundred(query.to_owned());
137
138        let query_holder = query_holder_builder.build();
139
140        let mut quick_stream_builder = QuickStreamBuilder::default();
141
142        quick_stream_builder
143            .cancellation_tocken(cancellation_token)
144            .max_connection_count(10)
145            .buffer_size(10)
146            .single_digits(1)
147            .tens(2)
148            .hundreds(1)
149            .db_config(config)
150            .queries(query_holder)
151            .max_records_per_cycle_batch(100)
152            .introduced_lag_cycles(1)
153            .introduced_lag_in_millies(10)
154            .connection_creation_threshold(25.0);
155
156        let upsert_quick_stream = quick_stream_builder.build_update();
157
158        let (tx, rx) = mpsc::channel::<Vec<MockData>>(100);
159        let handle = tokio::spawn(async move {
160            upsert_quick_stream.run(rx).await;
161        });
162
163        let data = vec![
164            MockData {
165                id: 1,
166                modified_date: DateTime::from_timestamp(1627847280, 0).unwrap().naive_local(),
167            },
168            MockData {
169                id: 2,
170                modified_date: DateTime::from_timestamp(1627847280, 0).unwrap().naive_local(),
171            },
172        ];
173
174        tx.send(data.clone()).await.unwrap();
175        time::sleep(Duration::from_secs(5)).await;
176        tx.send(data).await.unwrap();
177        handle.await.unwrap();
178    }
179
180}