tarantool_rs/client/
stream.rs

1use std::fmt;
2
3use async_trait::async_trait;
4
5use rmpv::Value;
6
7use super::{Connection, Transaction, TransactionBuilder};
8use crate::{codec::request::EncodedRequest, Executor, Result};
9
10/// Abstraction, providing sequential processing of requests.
11///
12/// With streams there is a guarantee that the server instance will not handle the next request in a stream until it has completed the previous one ([docs](https://www.tarantool.io/en/doc/latest/dev_guide/internals/box_protocol/#binary-protocol-streams)).
13///
14/// # Example
15///
16/// ```rust,compile
17/// use tarantool_rs::{Connection, Executor, ExecutorExt};
18/// # use futures::FutureExt;
19/// # use rmpv::Value;
20///
21/// # async fn async_wrapper() {
22/// let connection = Connection::builder().build("localhost:3301").await.unwrap();
23///
24/// // This will print 'fast' and then 'slow'
25/// let eval_slow_fut = connection
26///     .eval("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
27///     .inspect(|res| println!("{:?}", res));
28/// let eval_fast_fut = connection
29///     .eval("return ...;", ("fast", ))
30///     .inspect(|res| println!("{:?}", res));
31/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
32///
33/// // This will print 'slow' and then 'fast', since slow request was created first and have smaller sync
34/// let stream = connection.stream();
35/// let eval_slow_fut = stream
36///     .eval("fiber = require('fiber'); fiber.sleep(0.5); return ...;", ("slow", ))
37///     .inspect(|res| println!("{:?}", res));
38/// let eval_fast_fut = stream
39///     .eval("return ...;", ("fast", ))
40///     .inspect(|res| println!("{:?}", res));
41/// let _ = tokio::join!(eval_slow_fut, eval_fast_fut);
42/// # }
43/// ```
44
45#[derive(Clone)]
46pub struct Stream {
47    conn: Connection,
48    stream_id: u32,
49}
50
51// TODO: convert stream to transaction and back
52impl Stream {
53    pub(crate) fn new(conn: Connection) -> Self {
54        let stream_id = conn.next_stream_id();
55        Self { conn, stream_id }
56    }
57}
58
59#[async_trait]
60impl Executor for Stream {
61    async fn send_encoded_request(&self, mut request: EncodedRequest) -> Result<Value> {
62        request.stream_id = Some(self.stream_id);
63        self.conn.send_encoded_request(request).await
64    }
65
66    fn stream(&self) -> Stream {
67        self.conn.stream()
68    }
69
70    fn transaction_builder(&self) -> TransactionBuilder {
71        self.conn.transaction_builder()
72    }
73
74    async fn transaction(&self) -> Result<Transaction> {
75        self.conn.transaction().await
76    }
77
78    async fn get_cached_sql_statement_id(&self, statement: &str) -> Option<u64> {
79        self.conn.get_cached_sql_statement_id(statement).await
80    }
81}
82
83impl fmt::Debug for Stream {
84    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
85        f.debug_struct("Stream")
86            .field("stream_id", &self.stream_id)
87            .finish()
88    }
89}