sql_tools/insert/oracle_sql/
mod.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::{sync::Arc, thread::{self, JoinHandle}};

use indicatif::ProgressBar;
use iter_grid::{divide_grid, iter_grid, iter_grid_pb};
use sql_fmt::insert_stmt;
use validation::{does_table_exist, get_col_indexes};

use crate::{create::{CreateColumns, CreateDataTypes, ModifyCreateTable}, data_types::SQLDataTypes, errors::Error, QueryBuilder, SQLVariation};

use super::InsertProps;

pub mod validation;
pub mod iter_grid;
pub mod sql_fmt;

pub(crate) fn oracle_build_insert(mut insert_props: InsertProps) -> Result<(), Error> {
    let conn_info = match insert_props.connect {
        SQLVariation::Oracle(oracle_connect) => oracle_connect,
    };
    let username_conn = conn_info.username.to_owned();
    let password_conn = conn_info.password.to_owned();
    let connection_string_conn = conn_info.connection_string.to_owned();
    
    let table_exist = does_table_exist(&insert_props.table, &conn_info)?;
    if !table_exist {
        let col_type_indexes = get_col_indexes(&insert_props.grid)?;
        let columns = &insert_props.header.iter().enumerate().map(|(idx, cell)|{
            if col_type_indexes.is_date.contains(&idx) {
                CreateColumns{ name: cell.to_string(), data_type: CreateDataTypes::DATE }
            } else if col_type_indexes.is_int.contains(&idx) {
                CreateColumns{ name: cell.to_string(), data_type: CreateDataTypes::NUMBER }
            } else if col_type_indexes.is_float.contains(&idx) {
                CreateColumns{ name: cell.to_string(), data_type: CreateDataTypes::FLOAT }
            } else {
                let size = if let Some(val) = col_type_indexes.varchar_size.get(&idx) { val } else { &(1 as usize) };
                CreateColumns{ name: cell.to_string(), data_type: CreateDataTypes::VARCHAR(*size) }
            } 
        }).collect::<Vec<CreateColumns>>();
        conn_info.create().table(&insert_props.table, columns.to_vec()).build()?;
    }

    let len = &insert_props.grid.len();
    let nthreads = num_cpus::get();
    let num = (len / nthreads + if len % nthreads == 0 { 0 } else { 1 }) as f32;

    let mut handles: Vec<JoinHandle<Result<(), Error>>> = Vec::new();
    for n in 0..nthreads {
        let data: Vec<Vec<SQLDataTypes>>;
        if n + 1 < nthreads { data = divide_grid(&mut insert_props.grid, num); } else { data = insert_props.grid.to_owned(); }
        // println!("Thread: {} Data:\n{:?}\n=========", n, data);
        let query = insert_stmt(insert_props.header.len(), &insert_props.table, &insert_props.header.join(", "));
        let username = username_conn.clone();
        let password = password_conn.clone();
        let connection_string = connection_string_conn.clone();
        handles.push(thread::spawn(move || {
            let conn: oracle::Connection = oracle::Connection::connect(username, password, connection_string).unwrap(); 
            let mut batch = conn.batch(&query, data.len()).build()?;
            iter_grid(&mut batch, data)?;
            conn.commit()?;
            Ok(())
        }))
    }

    for handle in handles {
        handle.join().unwrap()?;
    }

    Ok(())
}

pub(crate) fn oracle_build_insert_with_pb(mut insert_props: InsertProps) -> Result<(), Error> {
    let conn_info = match insert_props.connect {
        SQLVariation::Oracle(oracle_connect) => oracle_connect,
    };
    let username_conn = conn_info.username.to_owned();
    let password_conn = conn_info.password.to_owned();
    let connection_string_conn = conn_info.connection_string.to_owned();
    
    let table_exist = does_table_exist(&insert_props.table, &conn_info)?;
    if !table_exist {
        let col_type_indexes = get_col_indexes(&insert_props.grid)?;
        let columns = &insert_props.header.iter().enumerate().map(|(idx, cell)|{
            if col_type_indexes.is_date.contains(&idx) {
                CreateColumns{ name: cell.to_string(), data_type: CreateDataTypes::DATE }
            } else if col_type_indexes.is_int.contains(&idx) {
                CreateColumns{ name: cell.to_string(), data_type: CreateDataTypes::NUMBER }
            } else if col_type_indexes.is_float.contains(&idx) {
                CreateColumns{ name: cell.to_string(), data_type: CreateDataTypes::FLOAT }
            } else {
                let size = if let Some(val) = col_type_indexes.varchar_size.get(&idx) { val } else { &(1 as usize) };
                CreateColumns{ name: cell.to_string(), data_type: CreateDataTypes::VARCHAR(*size) }
            } 
        }).collect::<Vec<CreateColumns>>();
        conn_info.create().table(&insert_props.table, columns.to_vec()).build()?;
    }

    let len = &insert_props.grid.len();
    let progress_bar = ProgressBar::new(*len as u64);
    let nthreads = num_cpus::get();
    let num = (len / nthreads + if len % nthreads == 0 { 0 } else { 1 }) as f32;

    let pb = Arc::new(progress_bar);

    let mut handles: Vec<JoinHandle<Result<(), Error>>> = Vec::new();
    for n in 0..nthreads {
        let data: Vec<Vec<SQLDataTypes>>;
        if n + 1 < nthreads { data = divide_grid(&mut insert_props.grid, num); } else { data = insert_props.grid.to_owned(); }
        // println!("Thread: {} Data:\n{:?}\n=========", n, data);
        let query = insert_stmt(insert_props.header.len(), &insert_props.table, &insert_props.header.join(", "));
        let username = username_conn.clone();
        let password = password_conn.clone();
        let connection_string = connection_string_conn.clone();
        let pb = Arc::clone(&pb);
        handles.push(thread::spawn(move || {
            let conn: oracle::Connection = oracle::Connection::connect(username, password, connection_string).unwrap(); 
            let mut batch = conn.batch(&query, data.len()).build()?;
            iter_grid_pb(&mut batch, data, pb)?;
            conn.commit()?;
            Ok(())
        }))
    }

    for handle in handles {
        handle.join().unwrap()?;
    }

    Ok(())
}