1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
use async_trait::async_trait;
use rmpv::Value;
use super::{Connection, ConnectionLike, Transaction, TransactionBuilder};
use crate::{codec::request::RequestBody, errors::Error};
/// Abstraction, providing sequential processing of requests.
///
/// With streams there is a guarantee that the server instance will not handle the next request in a stream until it has completed the previous one ([docs](https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#binary-protocol-streams)).
///
/// # Example
///
/// ```rust,compile
/// use tarantool_rs::{Connection, ConnectionLike};
/// # use futures::FutureExt;
/// # use rmpv::Value;
///
/// # async fn async_wrapper() {
/// let connection = Connection::builder().build("localhost:3301").await.unwrap();
///
/// // This will print 'fast' and then 'slow'
/// let eval_slow_fut = connection
/// .eval::<_, Value>("fiber = require('fiber'); fiber.sleep(0.5); return ...;", vec!["slow".into()])
/// .inspect(|res| println!("{:?}", res));
/// let eval_fast_fut = connection
/// .eval::<_, Value>("return ...;", vec!["fast".into()])
/// .inspect(|res| println!("{:?}", res));
/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
///
/// // This will print 'slow' and then 'fast', since slow request was created first and have smaller sync
/// let stream = connection.stream();
/// let eval_slow_fut = stream
/// .eval::<_, Value>("fiber = require('fiber'); fiber.sleep(0.5); return ...;", vec!["slow".into()])
/// .inspect(|res| println!("{:?}", res));
/// let eval_fast_fut = stream
/// .eval::<_, Value>("return ...;", vec!["fast".into()])
/// .inspect(|res| println!("{:?}", res));
/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
/// # }
/// ```
#[derive(Clone)]
pub struct Stream {
conn: Connection,
stream_id: u32,
}
// TODO: convert stream to transaction and back
impl Stream {
pub(crate) fn new(conn: Connection) -> Self {
let stream_id = conn.next_stream_id();
Self { conn, stream_id }
}
}
#[async_trait(?Send)]
impl ConnectionLike for Stream {
async fn send_request(&self, body: impl RequestBody) -> Result<Value, Error> {
self.conn.send_request(body, Some(self.stream_id)).await
}
fn stream(&self) -> Stream {
self.conn.stream()
}
fn transaction_builder(&self) -> TransactionBuilder {
self.conn.transaction_builder()
}
async fn transaction(&self) -> Result<Transaction, Error> {
self.conn.transaction().await
}
}