1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
use std::future::Future;
use std::pin::Pin;

use bb8::{Pool, PooledConnection};
use tonic::Code;

use crate::result_set::ResultSet;
use crate::TimestampBound;
use crate::ToSpanner;
use crate::{session::SessionManager, ConfigBuilder, Connection, Error, TransactionSelector};

/// An asynchronous Cloud Spanner client.
pub struct Client {
    connection: Box<dyn Connection>,
    session_pool: Pool<SessionManager>,
}

impl Client {
    /// Returns a new [ConfigBuilder] which can be used to configure how to connect to a Cloud Spanner instance and database.
    pub fn configure() -> ConfigBuilder {
        ConfigBuilder::default()
    }
}

impl Client {
    pub(crate) fn connect(
        connection: Box<dyn Connection>,
        session_pool: Pool<SessionManager>,
    ) -> Self {
        Self {
            connection,
            session_pool,
        }
    }

    /// Returns a [ReadContext] that can be used to read data out of Cloud Spanner.
    /// The returned context uses [TimestampBound::Strong] consistency for each individual read.
    pub fn read_only(&self) -> impl ReadContext {
        ReadOnly {
            connection: self.connection.clone(),
            bound: None,
            session_pool: self.session_pool.clone(),
        }
    }

    /// Returns a [ReadContext] that can be used to read data out of Cloud Spanner.
    /// The returned context uses the specified bounded consistency for each individual read.
    pub fn read_only_with_bound(&self, bound: TimestampBound) -> impl ReadContext {
        ReadOnly {
            connection: self.connection.clone(),
            bound: Some(bound),
            session_pool: self.session_pool.clone(),
        }
    }

    /// Returns a [TransactionContext] that can be used to both read and write data from/into Cloud Spanner.
    pub fn read_write(&mut self) -> TxRunner {
        TxRunner {
            connection: self.connection.clone(),
            session_pool: self.session_pool.clone(),
        }
    }
}

/// Defines the interface to read data out of Cloud Spanner.
#[async_trait::async_trait]
pub trait ReadContext {
    /// Execute a read-only SQL statement and returns a [ResultSet].
    ///
    /// # Parameters
    ///
    /// As per the [Cloud Spanner documentation](https://cloud.google.com/spanner/docs/sql-best-practices#query-parameters), the statement may contain named parameters, e.g.: `@param_name`.
    /// When such parameters are present in the SQL query, their value must be provided in the second argument to this function.
    ///
    /// See [ToSpanner] to determine how Rust values can be mapped to Cloud Spanner values.
    ///
    /// If the parameter values do not line up with parameters in the statement, an [Error] is returned.
    ///
    /// # Example
    ///
    ///  ```rust
    ///  let my_id = 42;
    ///  let rs = client.read_only().execute_sql(
    ///      "SELECT id FROM person WHERE id > @my_id",
    ///      &[("my_id", &my_id)],
    ///  ).await?;
    ///  for row in rs.iter() {
    ///    let id: u32 = row.get("id");
    ///    println!("id: {}", id);
    ///  }
    ///  ```
    async fn execute_sql(
        &mut self,
        statement: &str,
        parameters: &[(&str, &(dyn ToSpanner + Sync))],
    ) -> Result<ResultSet, Error>;
}

struct ReadOnly {
    connection: Box<dyn Connection>,
    bound: Option<TimestampBound>,
    session_pool: Pool<SessionManager>,
}

#[async_trait::async_trait]
impl ReadContext for ReadOnly {
    async fn execute_sql(
        &mut self,
        statement: &str,
        parameters: &[(&str, &(dyn ToSpanner + Sync))],
    ) -> Result<ResultSet, Error> {
        let session = self.session_pool.get().await?;
        let result = self
            .connection
            .execute_sql(
                &session,
                &TransactionSelector::SingleUse(self.bound.clone()),
                statement,
                parameters,
            )
            .await?;

        Ok(result)
    }
}

/// Defines the interface to read from and write into Cloud Spanner.
///
/// This extends [ReadContext] to provide additional write functionalities.
#[async_trait::async_trait]
pub trait TransactionContext: ReadContext {
    /// Execute a read-only SQL statement and returns the number of affected rows.
    ///
    /// # Parameters
    ///
    /// Like its [ReadContext::execute_sql] counterpart, this function also supports query parameters.
    ///
    /// # Example
    ///
    /// ```rust
    /// let id = 42;
    /// let name = "ferris";
    /// let rows = tx.execute_update(
    ///   "INSERT INTO person(id, name) VALUES (@id, @name)",
    ///   &[("id", &id), ("name", name)]
    /// ).await?;
    /// println!("Inserted {} row", rows);
    async fn execute_update(
        &mut self,
        statement: &str,
        parameters: &[(&str, &(dyn ToSpanner + Sync))],
    ) -> Result<i64, Error>;
}

pub struct Tx<'a> {
    connection: Box<dyn Connection>,
    session: PooledConnection<'a, SessionManager>,
    selector: TransactionSelector,
}

#[async_trait::async_trait]
impl<'a> ReadContext for Tx<'a> {
    async fn execute_sql(
        &mut self,
        statement: &str,
        parameters: &[(&str, &(dyn ToSpanner + Sync))],
    ) -> Result<ResultSet, Error> {
        let result_set = self
            .connection
            .execute_sql(&self.session, &self.selector, statement, parameters)
            .await?;

        // TODO: this is brittle, if we forget to do this in some other method, then we risk not committing.
        if let TransactionSelector::Begin = self.selector {
            if let Some(tx) = result_set.transaction.as_ref() {
                self.selector = TransactionSelector::Id(tx.clone());
            }
        }

        Ok(result_set)
    }
}

#[async_trait::async_trait]
impl<'a> TransactionContext for Tx<'a> {
    async fn execute_update(
        &mut self,
        statement: &str,
        parameters: &[(&str, &(dyn ToSpanner + Sync))],
    ) -> Result<i64, Error> {
        self.execute_sql(statement, parameters).await?
            .stats
            .row_count
            .ok_or_else(|| Error::Client("no row count available. This may be the result of using execute_update on a statement that did not contain DML.".to_string()))
    }
}

/// Allows running read/write transactions against Cloud Spanner.
pub struct TxRunner {
    connection: Box<dyn Connection>,
    session_pool: Pool<SessionManager>,
}

impl TxRunner {
    /// Runs abitrary read / write operations against Cloud Spanner.
    ///
    /// This function encapsulates the read/write transaction management concerns, allowing the application to minimize boilerplate.
    ///
    /// # Begin
    ///
    /// The underlying transaction is only lazily created. If the provided closure does no work against Cloud Spanner,
    /// then no transaction is created.
    ///
    /// # Commit / Rollback
    ///
    /// The underlying transaction will be committed if the provided closure returns `Ok`.
    /// Conversely, any `Err` returned will initiate a rollback.
    ///
    /// If the commit or rollback operation returns an unexpected error, then this function will return that error.
    ///
    /// # Retries
    ///
    /// When committing, Cloud Spanner may reject the transaction due to conflicts with another transaction.
    /// In these situations, Cloud Spanner allows retrying the transaction which will have a higher priority and potentially successfully commit.
    ///
    /// **NOTE:** the consequence of retyring is that the provided closure may be invoked multiple times.
    /// It is important to avoid doing any additional side effects within this closure as they will also potentially occur more than once.
    ///
    /// # Example
    ///
    /// ```rust
    /// async fn bump_version(id: u32) -> Result<u32, Error> {
    ///     client
    ///         .read_write()
    ///         .run(|tx| {
    ///             Box::pin(async move {
    ///                 let rs = tx
    ///                     .execute_sql(
    ///                         "SELECT MAX(version) FROM versions WHERE id = @id",
    ///                         &[("id", &id)],
    ///                     )
    ///                     .await?;
    ///                 let latest_version = rs.iter().next().unwrap().get::<u32>(0)?;
    ///                 let next_version = latest_version + 1;
    ///                 tx.execute_update(
    ///                     "INSERT INTO versions(id, version) VALUES(@id, @next_version)",
    ///                     &[("id", &id), ("next_version", &next_version)],
    ///                 )
    ///                 .await?;
    ///                 Ok(next_version)
    ///             })
    ///         })
    ///         .await?
    /// }
    /// ```
    pub async fn run<'b, O, F>(&'b mut self, mut work: F) -> Result<O, Error>
    where
        F: for<'a> FnMut(&'a mut Tx<'b>) -> Pin<Box<dyn Future<Output = Result<O, Error>> + 'a>>,
    {
        let session = self.session_pool.get().await?;
        let mut ctx = Tx {
            connection: self.connection.clone(),
            session,
            selector: TransactionSelector::Begin,
        };

        loop {
            ctx.selector = TransactionSelector::Begin;
            let result = (work)(&mut ctx).await;

            let commit_result = if let TransactionSelector::Id(tx) = ctx.selector {
                if result.is_ok() {
                    self.connection.commit(&ctx.session, tx).await
                } else {
                    self.connection.rollback(&ctx.session, tx).await
                }
            } else {
                Ok(())
            };

            match commit_result {
                Err(Error::Status(status)) if status.code() == Code::Aborted => continue,
                Err(err) => break Err(err),
                _ => break result,
            }
        }
    }
}