1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
use std::fmt;
use async_trait::async_trait;
use rmpv::Value;
use super::{Connection, Transaction, TransactionBuilder};
use crate::{codec::request::EncodedRequest, Executor, Result};
/// Abstraction, providing sequential processing of requests.
///
/// With streams there is a guarantee that the server instance will not handle the next request in a stream until it has completed the previous one ([docs](https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#binary-protocol-streams)).
///
/// # Example
///
/// ```rust,compile
/// use tarantool_rs::{Connection, Executor, ExecutorExt};
/// # use futures::FutureExt;
/// # use rmpv::Value;
///
/// # async fn async_wrapper() {
/// let connection = Connection::builder().build("localhost:3301").await.unwrap();
///
/// // This will print 'fast' and then 'slow'
/// let eval_slow_fut = connection
/// .eval("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
/// .inspect(|res| println!("{:?}", res));
/// let eval_fast_fut = connection
/// .eval("return ...;", ("fast", ))
/// .inspect(|res| println!("{:?}", res));
/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
///
/// // This will print 'slow' and then 'fast', since slow request was created first and have smaller sync
/// let stream = connection.stream();
/// let eval_slow_fut = stream
/// .eval("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
/// .inspect(|res| println!("{:?}", res));
/// let eval_fast_fut = stream
/// .eval("return ...;", ("fast", ))
/// .inspect(|res| println!("{:?}", res));
/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
/// # }
/// ```
#[derive(Clone)]
pub struct Stream {
conn: Connection,
stream_id: u32,
}
// TODO: convert stream to transaction and back
impl Stream {
pub(crate) fn new(conn: Connection) -> Self {
let stream_id = conn.next_stream_id();
Self { conn, stream_id }
}
}
#[async_trait]
impl Executor for Stream {
async fn send_encoded_request(&self, mut request: EncodedRequest) -> Result<Value> {
request.stream_id = Some(self.stream_id);
self.conn.send_encoded_request(request).await
}
fn stream(&self) -> Stream {
self.conn.stream()
}
fn transaction_builder(&self) -> TransactionBuilder {
self.conn.transaction_builder()
}
async fn transaction(&self) -> Result<Transaction> {
self.conn.transaction().await
}
async fn get_cached_sql_statement_id(&self, statement: &str) -> Option<u64> {
self.conn.get_cached_sql_statement_id(statement).await
}
}
impl fmt::Debug for Stream {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Stream")
.field("stream_id", &self.stream_id)
.finish()
}
}