1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
pub struct PgConnection {
    conn: i32,
    metadata_cache: diesel::pg::PgMetadataCache,
    transaction_manager: diesel::connection::AnsiTransactionManager,
}

impl diesel::connection::SimpleConnection for PgConnection {
    fn batch_execute(&mut self, query: &str) -> diesel::QueryResult<()> {
        let (ptr, len) = ft_sys::memory::string_to_bytes_ptr(query.to_string());
        let ptr = unsafe { pg_batch_execute(self.conn, ptr, len) };
        let res: Result<(), ft_sys_shared::DbError> = ft_sys::memory::json_from_ptr(ptr);
        match res {
            Ok(_) => Ok(()),
            Err(e) => {
                let e = ft_sys::diesel::db_error_to_diesel_error(e);
                // update_transaction_manager_status(&e, &mut self.transaction_manager);
                Err(e)
            }
        }
    }
}

impl diesel::connection::ConnectionSealed for PgConnection {}

impl diesel::pg::GetPgMetadataCache for PgConnection {
    fn get_metadata_cache(&mut self) -> &mut diesel::pg::PgMetadataCache {
        &mut self.metadata_cache
    }
}

#[derive(serde::Serialize)]
struct Query {
    sql: String,
    binds: Vec<(u32, Option<Vec<u8>>)>,
}

extern "C" {
    fn pg_connect(ptr: i32, len: i32) -> i32;
    fn pg_query(conn: i32, ptr: i32, len: i32) -> i32;
    fn pg_execute(conn: i32, ptr: i32, len: i32) -> i32;
    fn pg_batch_execute(conn: i32, ptr: i32, len: i32) -> i32;
}

fn source_to_query<T>(
    source: T,
    metadata_lookup: &mut <diesel::pg::Pg as diesel::sql_types::TypeMetadata>::MetadataLookup,
) -> diesel::QueryResult<Query>
where
    T: diesel::query_builder::QueryFragment<diesel::pg::Pg> + diesel::query_builder::QueryId,
{
    use diesel::query_builder::QueryBuilder;

    let mut qb = diesel::pg::PgQueryBuilder::new();
    source.to_sql(&mut qb, &diesel::pg::Pg)?;
    let sql = qb.finish();

    let mut rbc = diesel::query_builder::bind_collector::RawBytesBindCollector::new();
    source.collect_binds(&mut rbc, metadata_lookup, &diesel::pg::Pg)?;

    // self.metadata_cache.
    let binds = rbc
        .metadata
        .into_iter()
        .zip(rbc.binds)
        .map(|(meta, bind)| (meta.oid().unwrap_or_default(), bind))
        .collect::<Vec<_>>();

    Ok(Query { sql, binds })
}

impl diesel::connection::LoadConnection for PgConnection {
    type Cursor<'conn, 'query> = ft_sys::diesel::Cursor;
    type Row<'conn, 'query> = ft_sys::diesel::PgRow;

    fn load<'conn, 'query, T>(
        &'conn mut self,
        source: T,
    ) -> diesel::QueryResult<Self::Cursor<'conn, 'query>>
    where
        T: diesel::query_builder::Query
            + diesel::query_builder::QueryFragment<Self::Backend>
            + diesel::query_builder::QueryId
            + 'query,
        Self::Backend: diesel::expression::QueryMetadata<T::SqlType>,
    {
        let q = source_to_query(source, self)?;
        let (ptr, len) = ft_sys::memory::json_ptr(q);
        let ptr = unsafe { pg_query(self.conn, ptr, len) };
        let cursor: Result<ft_sys::diesel::Cursor, ft_sys_shared::DbError> =
            ft_sys::memory::json_from_ptr(ptr);

        match cursor {
            Ok(cursor) => Ok(cursor),
            Err(e) => {
                let e = ft_sys::diesel::db_error_to_diesel_error(e);
                // update_transaction_manager_status(&e, &mut self.transaction_manager);
                Err(e)
            }
        }
    }
}

// fn update_transaction_manager_status(
//     e: &diesel::result::Error,
//     transaction_manager: &mut diesel::connection::AnsiTransactionManager,
// ) {
//     if let diesel::result::Error::DatabaseError(DatabaseErrorKind::SerializationFailure, _) = e
//     {
//         transaction_manager
//             .status
//             .set_requires_rollback_maybe_up_to_top_level(true)
//     }
// }

impl diesel::connection::Connection for PgConnection {
    type Backend = diesel::pg::Pg;
    type TransactionManager = diesel::connection::AnsiTransactionManager;

    fn establish(url: &str) -> diesel::ConnectionResult<Self> {
        let (ptr, len) = ft_sys::memory::string_to_bytes_ptr(url.to_string());
        Ok(PgConnection {
            conn: unsafe { pg_connect(ptr, len) },
            metadata_cache: diesel::pg::PgMetadataCache::new(),
            transaction_manager: Default::default(),
        })
    }

    fn execute_returning_count<T>(&mut self, source: &T) -> diesel::QueryResult<usize>
    where
        T: diesel::query_builder::QueryFragment<Self::Backend> + diesel::query_builder::QueryId,
    {
        let q = source_to_query(source, self)?;
        let (ptr, len) = ft_sys::memory::json_ptr(q);

        let ptr = unsafe { pg_execute(self.conn, ptr, len) };

        let res: Result<usize, ft_sys_shared::DbError> = ft_sys::memory::json_from_ptr(ptr);
        match res {
            Ok(size) => Ok(size),
            Err(e) => {
                let e = ft_sys::diesel::db_error_to_diesel_error(e);
                // update_transaction_manager_status(&e, &mut self.transaction_manager);
                Err(e)
            }
        }
    }

    fn transaction_state(
        &mut self,
    ) -> &mut <Self::TransactionManager as diesel::connection::TransactionManager<Self>>::TransactionStateData{
        &mut self.transaction_manager
    }
}